Re: KIP-28 does not allow Processor to specify partition of output message
Created https://issues.apache.org/jira/browse/KAFKA-2649 and attached a PR with the proposed change. Thanks! On October 14, 2015 at 3:12:34 AM, Guozhang Wang (wangg...@gmail.com) wrote: Thanks! On Tue, Oct 13, 2015 at 9:34 PM, Randall Hauchwrote: Ok, cool. I agree we want something simple. I'll create an issue and create a pull request with a proposal. Look for it tomorrow. On Oct 13, 2015, at 10:25 PM, Guozhang Wang wrote: I see your point. Yeah I think it is a good way to add a Partitioner into addSink(...) but the Partitioner interface in producer is a bit overkill: "partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)" whereas for us we only want to partition on (K key, V value). Perhaps we should add a new Partitioner interface in Kafka Streams? Guozhang On Tue, Oct 13, 2015 at 6:38 PM, Randall Hauch wrote: This overrides the partitioning logic for all topics, right? That means I have to explicitly call the default partitioning logic for all topics except those that my Producer forwards. I’m guess the best way to do by extending org.apache.kafka.clients.producer.DefaultProducer. Of course, with multiple sinks in my topology, I have to put all of the partitioning logic inside a single class. What would you think about adding an overloaded TopologyBuilder.addSink(…) method that takes a Partitioner (or better yet a smaller functional interface). The resulting SinkProcessor could use that Partitioner instance to set the partition number? That’d be super convenient for users, would keep the logic where it belongs (where the topology defines the sinks), and best of all the implementations won't have to worry about any other topics, such as those used by stores, metrics, or other sinks. Best regards, Randall On October 13, 2015 at 8:09:41 PM, Guozhang Wang (wangg...@gmail.com) wrote: Hi Randall, You can try to set the partitioner class as ProducerConfig.PARTITIONER_CLASS_CONFIG in the StreamsConfig, its interface can be found in org.apache.kafka.clients.producer.Partitioner Let me know if it works for you. Guozhang On Tue, Oct 13, 2015 at 10:59 AM, Randall Hauch wrote: > The new streams API added with KIP-28 is great. I’ve been using it on a > prototype for a few weeks, and I’m looking forward to it being included in > 0.9.0. However, at the moment, a Processor implementation is not able to > specify the partition number when it outputs messages. > > I’d be happy to log a JIRA and create a PR to add it to the API, but > without knowing all of the history I’m wondering if leaving it out of the > API was intentional. > > Thoughts? > > Best regards, > > Randall Hauch > -- -- Guozhang -- -- Guozhang -- -- Guozhang
Re: KIP-28 does not allow Processor to specify partition of output message
A partitioning scheme should be a cluster wide thing. Letting each sink have a different partitioning scheme does not make sense to me. A partitioning scheme is not specific to a stream job, each task or a sink. I think specifying it at sink level is more error prone. If a user wants to customize a partitioning scheme, he/she also want to manage it at some central place, maybe a code repo, or a jar file. All application must use the same logic, otherwise data will be messed up. Thus, a single class representing all partitioning logic is not a bad thing at all. (The code organization wise, all logic does not necessarily in the single class, of course.) On Wed, Oct 14, 2015 at 8:47 AM, Randall Hauchwrote: > Created https://issues.apache.org/jira/browse/KAFKA-2649 and attached a > PR with the proposed change. > > Thanks! > > > On October 14, 2015 at 3:12:34 AM, Guozhang Wang (wangg...@gmail.com) > wrote: > > Thanks! > > On Tue, Oct 13, 2015 at 9:34 PM, Randall Hauch wrote: > Ok, cool. I agree we want something simple. I'll create an issue and > create a pull request with a proposal. Look for it tomorrow. > > On Oct 13, 2015, at 10:25 PM, Guozhang Wang wrote: > > I see your point. Yeah I think it is a good way to add a Partitioner into > addSink(...) but the Partitioner interface in producer is a bit overkill: > > "partition(String topic, Object key, byte[] keyBytes, Object value, byte[] > valueBytes, Cluster cluster)" > > whereas for us we only want to partition on (K key, V value). > > Perhaps we should add a new Partitioner interface in Kafka Streams? > > Guozhang > > On Tue, Oct 13, 2015 at 6:38 PM, Randall Hauch wrote: > This overrides the partitioning logic for all topics, right? That means I > have to explicitly call the default partitioning logic for all topics > except those that my Producer forwards. I’m guess the best way to do by > extending org.apache.kafka.clients.producer.DefaultProducer. Of course, > with multiple sinks in my topology, I have to put all of the partitioning > logic inside a single class. > > What would you think about adding an overloaded TopologyBuilder.addSink(…) > method that takes a Partitioner (or better yet a smaller functional > interface). The resulting SinkProcessor could use that Partitioner instance > to set the partition number? That’d be super convenient for users, would > keep the logic where it belongs (where the topology defines the sinks), and > best of all the implementations won't have to worry about any other topics, > such as those used by stores, metrics, or other sinks. > > Best regards, > > Randall > > > On October 13, 2015 at 8:09:41 PM, Guozhang Wang (wangg...@gmail.com) > wrote: > > Hi Randall, > > You can try to set the partitioner class as > ProducerConfig.PARTITIONER_CLASS_CONFIG in the StreamsConfig, its interface > can be found in > > org.apache.kafka.clients.producer.Partitioner > > Let me know if it works for you. > > Guozhang > > On Tue, Oct 13, 2015 at 10:59 AM, Randall Hauch wrote: > > > The new streams API added with KIP-28 is great. I’ve been using it on a > > prototype for a few weeks, and I’m looking forward to it being included > in > > 0.9.0. However, at the moment, a Processor implementation is not able to > > specify the partition number when it outputs messages. > > > > I’d be happy to log a JIRA and create a PR to add it to the API, but > > without knowing all of the history I’m wondering if leaving it out of the > > API was intentional. > > > > Thoughts? > > > > Best regards, > > > > Randall Hauch > > > > > > -- > -- Guozhang > > > > -- > -- Guozhang > > > > -- > -- Guozhang >
Re: KIP-28 does not allow Processor to specify partition of output message
Thanks! On Tue, Oct 13, 2015 at 9:34 PM, Randall Hauchwrote: > Ok, cool. I agree we want something simple. I'll create an issue and > create a pull request with a proposal. Look for it tomorrow. > > On Oct 13, 2015, at 10:25 PM, Guozhang Wang wrote: > > I see your point. Yeah I think it is a good way to add a Partitioner into > addSink(...) but the Partitioner interface in producer is a bit overkill: > > "partition(String topic, Object key, byte[] keyBytes, Object value, byte[] > valueBytes, Cluster cluster)" > > whereas for us we only want to partition on (K key, V value). > > Perhaps we should add a new Partitioner interface in Kafka Streams? > > Guozhang > > On Tue, Oct 13, 2015 at 6:38 PM, Randall Hauch wrote: > >> This overrides the partitioning logic for all topics, right? That means I >> have to explicitly call the default partitioning logic for all topics >> except those that my Producer forwards. I’m guess the best way to do by >> extending org.apache.kafka.clients.producer.DefaultProducer. Of course, >> with multiple sinks in my topology, I have to put all of the partitioning >> logic inside a single class. >> >> What would you think about adding an overloaded >> TopologyBuilder.addSink(…) method that takes a Partitioner (or better yet a >> smaller functional interface). The resulting SinkProcessor could use that >> Partitioner instance to set the partition number? That’d be super >> convenient for users, would keep the logic where it belongs (where the >> topology defines the sinks), and best of all the implementations won't have >> to worry about any other topics, such as those used by stores, metrics, or >> other sinks. >> >> Best regards, >> >> Randall >> >> >> On October 13, 2015 at 8:09:41 PM, Guozhang Wang (wangg...@gmail.com) >> wrote: >> >> Hi Randall, >> >> You can try to set the partitioner class as >> ProducerConfig.PARTITIONER_CLASS_CONFIG in the StreamsConfig, its >> interface >> can be found in >> >> org.apache.kafka.clients.producer.Partitioner >> >> Let me know if it works for you. >> >> Guozhang >> >> On Tue, Oct 13, 2015 at 10:59 AM, Randall Hauch >> wrote: >> >> > The new streams API added with KIP-28 is great. I’ve been using it on a >> > prototype for a few weeks, and I’m looking forward to it being included >> in >> > 0.9.0. However, at the moment, a Processor implementation is not able >> to >> > specify the partition number when it outputs messages. >> > >> > I’d be happy to log a JIRA and create a PR to add it to the API, but >> > without knowing all of the history I’m wondering if leaving it out of >> the >> > API was intentional. >> > >> > Thoughts? >> > >> > Best regards, >> > >> > Randall Hauch >> > >> >> >> >> -- >> -- Guozhang >> >> > > > -- > -- Guozhang > > -- -- Guozhang
Re: KIP-28 does not allow Processor to specify partition of output message
It absolutely is important that the partitioning logic for a single topic be the same across an entire cluster. IOW, if a topology has a single sink, then no matter where that topology is run in the cluster, it had better use the same partitioning logic. I would argue that when the partitioning logic varies from the default logic, it’s far better to encapsulate it within the topology’s definition, and adding it to the sink is a very easy way to do this (and very natural for the developer using Kafka Streams). However, centralizing the partitioning logic for all streams is certainly not ideal, primarily because different topics will likely need to be partitioned in different ways. This is especially true for stateful stream processing, which depends on messages with the same key going to the same processor instance that owns that keyed data. IOW, the partitioning logic used by a producer is strongly informed by how the *downstream stateful consumers* are organized/clustered. It gets far more complicated when considering built-in topics used by offset management, state storage, and metrics. The bottom line is that *different* topics will likely need to be partitioned differently. On October 14, 2015 at 12:57:37 PM, Yasuhiro Matsuda (yasuhiro.mats...@gmail.com) wrote: A partitioning scheme should be a cluster wide thing. Letting each sink have a different partitioning scheme does not make sense to me. A partitioning scheme is not specific to a stream job, each task or a sink. I think specifying it at sink level is more error prone. If a user wants to customize a partitioning scheme, he/she also want to manage it at some central place, maybe a code repo, or a jar file. All application must use the same logic, otherwise data will be messed up. Thus, a single class representing all partitioning logic is not a bad thing at all. (The code organization wise, all logic does not necessarily in the single class, of course.) On Wed, Oct 14, 2015 at 8:47 AM, Randall Hauchwrote: > Created https://issues.apache.org/jira/browse/KAFKA-2649 and attached a > PR with the proposed change. > > Thanks! > > > On October 14, 2015 at 3:12:34 AM, Guozhang Wang (wangg...@gmail.com) > wrote: > > Thanks! > > On Tue, Oct 13, 2015 at 9:34 PM, Randall Hauch wrote: > Ok, cool. I agree we want something simple. I'll create an issue and > create a pull request with a proposal. Look for it tomorrow. > > On Oct 13, 2015, at 10:25 PM, Guozhang Wang wrote: > > I see your point. Yeah I think it is a good way to add a Partitioner into > addSink(...) but the Partitioner interface in producer is a bit overkill: > > "partition(String topic, Object key, byte[] keyBytes, Object value, byte[] > valueBytes, Cluster cluster)" > > whereas for us we only want to partition on (K key, V value). > > Perhaps we should add a new Partitioner interface in Kafka Streams? > > Guozhang > > On Tue, Oct 13, 2015 at 6:38 PM, Randall Hauch wrote: > This overrides the partitioning logic for all topics, right? That means I > have to explicitly call the default partitioning logic for all topics > except those that my Producer forwards. I’m guess the best way to do by > extending org.apache.kafka.clients.producer.DefaultProducer. Of course, > with multiple sinks in my topology, I have to put all of the partitioning > logic inside a single class. > > What would you think about adding an overloaded TopologyBuilder.addSink(…) > method that takes a Partitioner (or better yet a smaller functional > interface). The resulting SinkProcessor could use that Partitioner instance > to set the partition number? That’d be super convenient for users, would > keep the logic where it belongs (where the topology defines the sinks), and > best of all the implementations won't have to worry about any other topics, > such as those used by stores, metrics, or other sinks. > > Best regards, > > Randall > > > On October 13, 2015 at 8:09:41 PM, Guozhang Wang (wangg...@gmail.com) > wrote: > > Hi Randall, > > You can try to set the partitioner class as > ProducerConfig.PARTITIONER_CLASS_CONFIG in the StreamsConfig, its interface > can be found in > > org.apache.kafka.clients.producer.Partitioner > > Let me know if it works for you. > > Guozhang > > On Tue, Oct 13, 2015 at 10:59 AM, Randall Hauch wrote: > > > The new streams API added with KIP-28 is great. I’ve been using it on a > > prototype for a few weeks, and I’m looking forward to it being included > in > > 0.9.0. However, at the moment, a Processor implementation is not able to > > specify the partition number when it outputs messages. > > > > I’d be happy to log a JIRA and create a PR to add it to the API, but > > without knowing all of the history I’m
Re: KIP-28 does not allow Processor to specify partition of output message
I think part of Yasu's motivation for cross-cluster partitioning is that, for example, there could be multiple stream jobs reading / writing to some shared topics but controlled by different teams or services inside an organization, and if one team mistakenly specifying the partitioning in a wrong way it will interfere with other teams, hence a global management of partitioning scheme may be required just like a global schema registry service for Kafka. With the producer's Partitioner interface, we can still make different partitioning schemes for different topics in a single class: just switch-branch on the topic name and cast the key-value types, but that would be a bit awkward. So I am preferring to a customizable partitioner in the sink spec for better user programmability. Guozhang On Wed, Oct 14, 2015 at 1:03 PM, Randall Hauchwrote: > It absolutely is important that the partitioning logic for a single topic > be the same across an entire cluster. IOW, if a topology has a single sink, > then no matter where that topology is run in the cluster, it had better use > the same partitioning logic. I would argue that when the partitioning logic > varies from the default logic, it’s far better to encapsulate it within the > topology’s definition, and adding it to the sink is a very easy way to do > this (and very natural for the developer using Kafka Streams). > > However, centralizing the partitioning logic for all streams is certainly > not ideal, primarily because different topics will likely need to be > partitioned in different ways. This is especially true for stateful stream > processing, which depends on messages with the same key going to the same > processor instance that owns that keyed data. IOW, the partitioning logic > used by a producer is strongly informed by how the *downstream stateful > consumers* are organized/clustered. It gets far more complicated when > considering built-in topics used by offset management, state storage, and > metrics. > > The bottom line is that *different* topics will likely need to be > partitioned differently. > > On October 14, 2015 at 12:57:37 PM, Yasuhiro Matsuda ( > yasuhiro.mats...@gmail.com) wrote: > > A partitioning scheme should be a cluster wide thing. Letting each sink > have a different partitioning scheme does not make sense to me. A > partitioning scheme is not specific to a stream job, each task or a sink. > I > think specifying it at sink level is more error prone. > > If a user wants to customize a partitioning scheme, he/she also want to > manage it at some central place, maybe a code repo, or a jar file. All > application must use the same logic, otherwise data will be messed up. > Thus, a single class representing all partitioning logic is not a bad > thing > at all. (The code organization wise, all logic does not necessarily in the > single class, of course.) > > > On Wed, Oct 14, 2015 at 8:47 AM, Randall Hauch wrote: > > > Created https://issues.apache.org/jira/browse/KAFKA-2649 and attached a > > PR with the proposed change. > > > > Thanks! > > > > > > On October 14, 2015 at 3:12:34 AM, Guozhang Wang (wangg...@gmail.com) > > wrote: > > > > Thanks! > > > > On Tue, Oct 13, 2015 at 9:34 PM, Randall Hauch > wrote: > > Ok, cool. I agree we want something simple. I'll create an issue and > > create a pull request with a proposal. Look for it tomorrow. > > > > On Oct 13, 2015, at 10:25 PM, Guozhang Wang wrote: > > > > I see your point. Yeah I think it is a good way to add a Partitioner > into > > addSink(...) but the Partitioner interface in producer is a bit > overkill: > > > > "partition(String topic, Object key, byte[] keyBytes, Object value, > byte[] > > valueBytes, Cluster cluster)" > > > > whereas for us we only want to partition on (K key, V value). > > > > Perhaps we should add a new Partitioner interface in Kafka Streams? > > > > Guozhang > > > > On Tue, Oct 13, 2015 at 6:38 PM, Randall Hauch > wrote: > > This overrides the partitioning logic for all topics, right? That means > I > > have to explicitly call the default partitioning logic for all topics > > except those that my Producer forwards. I’m guess the best way to do by > > extending org.apache.kafka.clients.producer.DefaultProducer. Of course, > > with multiple sinks in my topology, I have to put all of the > partitioning > > logic inside a single class. > > > > What would you think about adding an overloaded > TopologyBuilder.addSink(…) > > method that takes a Partitioner (or better yet a smaller functional > > interface). The resulting SinkProcessor could use that Partitioner > instance > > to set the partition number? That’d be super convenient for users, would > > keep the logic where it belongs (where the topology defines the sinks), > and > > best of all the implementations won't have to worry about any other > topics, > > such as those used by stores, metrics, or other sinks.
Re: KIP-28 does not allow Processor to specify partition of output message
>The bottom line is that *different* topics will likely need to be partitioned differently. You can do it with the existing Partitioner interface. Centralizing the logic doesn't mean all topics must use the same partitioning scheme. On Wed, Oct 14, 2015 at 1:03 PM, Randall Hauchwrote: > It absolutely is important that the partitioning logic for a single topic > be the same across an entire cluster. IOW, if a topology has a single sink, > then no matter where that topology is run in the cluster, it had better use > the same partitioning logic. I would argue that when the partitioning logic > varies from the default logic, it’s far better to encapsulate it within the > topology’s definition, and adding it to the sink is a very easy way to do > this (and very natural for the developer using Kafka Streams). > > However, centralizing the partitioning logic for all streams is certainly > not ideal, primarily because different topics will likely need to be > partitioned in different ways. This is especially true for stateful stream > processing, which depends on messages with the same key going to the same > processor instance that owns that keyed data. IOW, the partitioning logic > used by a producer is strongly informed by how the *downstream stateful > consumers* are organized/clustered. It gets far more complicated when > considering built-in topics used by offset management, state storage, and > metrics. > > The bottom line is that *different* topics will likely need to be > partitioned differently. > > On October 14, 2015 at 12:57:37 PM, Yasuhiro Matsuda ( > yasuhiro.mats...@gmail.com) wrote: > > A partitioning scheme should be a cluster wide thing. Letting each sink > have a different partitioning scheme does not make sense to me. A > partitioning scheme is not specific to a stream job, each task or a sink. > I > think specifying it at sink level is more error prone. > > If a user wants to customize a partitioning scheme, he/she also want to > manage it at some central place, maybe a code repo, or a jar file. All > application must use the same logic, otherwise data will be messed up. > Thus, a single class representing all partitioning logic is not a bad > thing > at all. (The code organization wise, all logic does not necessarily in the > single class, of course.) > > > On Wed, Oct 14, 2015 at 8:47 AM, Randall Hauch wrote: > > > Created https://issues.apache.org/jira/browse/KAFKA-2649 and attached a > > PR with the proposed change. > > > > Thanks! > > > > > > On October 14, 2015 at 3:12:34 AM, Guozhang Wang (wangg...@gmail.com) > > wrote: > > > > Thanks! > > > > On Tue, Oct 13, 2015 at 9:34 PM, Randall Hauch > wrote: > > Ok, cool. I agree we want something simple. I'll create an issue and > > create a pull request with a proposal. Look for it tomorrow. > > > > On Oct 13, 2015, at 10:25 PM, Guozhang Wang wrote: > > > > I see your point. Yeah I think it is a good way to add a Partitioner > into > > addSink(...) but the Partitioner interface in producer is a bit > overkill: > > > > "partition(String topic, Object key, byte[] keyBytes, Object value, > byte[] > > valueBytes, Cluster cluster)" > > > > whereas for us we only want to partition on (K key, V value). > > > > Perhaps we should add a new Partitioner interface in Kafka Streams? > > > > Guozhang > > > > On Tue, Oct 13, 2015 at 6:38 PM, Randall Hauch > wrote: > > This overrides the partitioning logic for all topics, right? That means > I > > have to explicitly call the default partitioning logic for all topics > > except those that my Producer forwards. I’m guess the best way to do by > > extending org.apache.kafka.clients.producer.DefaultProducer. Of course, > > with multiple sinks in my topology, I have to put all of the > partitioning > > logic inside a single class. > > > > What would you think about adding an overloaded > TopologyBuilder.addSink(…) > > method that takes a Partitioner (or better yet a smaller functional > > interface). The resulting SinkProcessor could use that Partitioner > instance > > to set the partition number? That’d be super convenient for users, would > > keep the logic where it belongs (where the topology defines the sinks), > and > > best of all the implementations won't have to worry about any other > topics, > > such as those used by stores, metrics, or other sinks. > > > > Best regards, > > > > Randall > > > > > > On October 13, 2015 at 8:09:41 PM, Guozhang Wang (wangg...@gmail.com) > > wrote: > > > > Hi Randall, > > > > You can try to set the partitioner class as > > ProducerConfig.PARTITIONER_CLASS_CONFIG in the StreamsConfig, its > interface > > can be found in > > > > org.apache.kafka.clients.producer.Partitioner > > > > Let me know if it works for you. > > > > Guozhang > > > > On Tue, Oct 13, 2015 at 10:59 AM, Randall Hauch > wrote: > > > > > The new streams API added with KIP-28 is great. I’ve
Re: KIP-28 does not allow Processor to specify partition of output message
I agree that cluster-wide partitioning would be preferable in cases where multiple producers from different services sharing the same topics, and this may well be resolved by the same manner like the schema registry service. On the other hand, I think this is not a problem that would be solved at the KStream level which only considers per-job configs. I think a KStream-layer partitioner in addition to the Producer partitioner may not be a bad thing in that: 1) like Randall mentioned, with the producer Partitioner we basically need to switch on ALL possible topics, and casting the Object key / value into the proper types, and then do the partitioning logic; this is awkward especially for cases that users only want to have customized partitioning for a small subset of topics while being OK to leave the others to default behavior (i.e. murmur hash on key). Extending the DefaultPartitioner may partially solve this problem but not all. 2) with KStream-layer partitioner, it basically allows us to "overwrite" the partitioning for probably a subset of partitions while leaving others to the producer's default partitioner. This partitioner would be exposed to users working on the lower-level processor API layer, while on the KStream DSL layer we may well "infer" the partitioning scheme for most cases based on join / aggregation specs, hence would not incur much burden for users. Guozhang On Wed, Oct 14, 2015 at 10:57 AM, Yasuhiro Matsuda < yasuhiro.mats...@gmail.com> wrote: > A partitioning scheme should be a cluster wide thing. Letting each sink > have a different partitioning scheme does not make sense to me. A > partitioning scheme is not specific to a stream job, each task or a sink. I > think specifying it at sink level is more error prone. > > If a user wants to customize a partitioning scheme, he/she also want to > manage it at some central place, maybe a code repo, or a jar file. All > application must use the same logic, otherwise data will be messed up. > Thus, a single class representing all partitioning logic is not a bad thing > at all. (The code organization wise, all logic does not necessarily in the > single class, of course.) > > > On Wed, Oct 14, 2015 at 8:47 AM, Randall Hauchwrote: > >> Created https://issues.apache.org/jira/browse/KAFKA-2649 and attached a >> PR with the proposed change. >> >> Thanks! >> >> >> On October 14, 2015 at 3:12:34 AM, Guozhang Wang (wangg...@gmail.com) >> wrote: >> >> Thanks! >> >> On Tue, Oct 13, 2015 at 9:34 PM, Randall Hauch wrote: >> Ok, cool. I agree we want something simple. I'll create an issue and >> create a pull request with a proposal. Look for it tomorrow. >> >> On Oct 13, 2015, at 10:25 PM, Guozhang Wang wrote: >> >> I see your point. Yeah I think it is a good way to add a Partitioner into >> addSink(...) but the Partitioner interface in producer is a bit overkill: >> >> "partition(String topic, Object key, byte[] keyBytes, Object value, >> byte[] valueBytes, Cluster cluster)" >> >> whereas for us we only want to partition on (K key, V value). >> >> Perhaps we should add a new Partitioner interface in Kafka Streams? >> >> Guozhang >> >> On Tue, Oct 13, 2015 at 6:38 PM, Randall Hauch wrote: >> This overrides the partitioning logic for all topics, right? That means I >> have to explicitly call the default partitioning logic for all topics >> except those that my Producer forwards. I’m guess the best way to do by >> extending org.apache.kafka.clients.producer.DefaultProducer. Of course, >> with multiple sinks in my topology, I have to put all of the partitioning >> logic inside a single class. >> >> What would you think about adding an overloaded >> TopologyBuilder.addSink(…) method that takes a Partitioner (or better yet a >> smaller functional interface). The resulting SinkProcessor could use that >> Partitioner instance to set the partition number? That’d be super >> convenient for users, would keep the logic where it belongs (where the >> topology defines the sinks), and best of all the implementations won't have >> to worry about any other topics, such as those used by stores, metrics, or >> other sinks. >> >> Best regards, >> >> Randall >> >> >> On October 13, 2015 at 8:09:41 PM, Guozhang Wang (wangg...@gmail.com) >> wrote: >> >> Hi Randall, >> >> You can try to set the partitioner class as >> ProducerConfig.PARTITIONER_CLASS_CONFIG in the StreamsConfig, its >> interface >> can be found in >> >> org.apache.kafka.clients.producer.Partitioner >> >> Let me know if it works for you. >> >> Guozhang >> >> On Tue, Oct 13, 2015 at 10:59 AM, Randall Hauch wrote: >> >> > The new streams API added with KIP-28 is great. I’ve been using it on a >> > prototype for a few weeks, and I’m looking forward to it being included >> in >> > 0.9.0. However, at the moment, a Processor implementation is not able to >> > specify the partition number when it outputs messages.
Re: KIP-28 does not allow Processor to specify partition of output message
I see your point. Yeah I think it is a good way to add a Partitioner into addSink(...) but the Partitioner interface in producer is a bit overkill: "partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster)" whereas for us we only want to partition on (K key, V value). Perhaps we should add a new Partitioner interface in Kafka Streams? Guozhang On Tue, Oct 13, 2015 at 6:38 PM, Randall Hauchwrote: > This overrides the partitioning logic for all topics, right? That means I > have to explicitly call the default partitioning logic for all topics > except those that my Producer forwards. I’m guess the best way to do by > extending org.apache.kafka.clients.producer.DefaultProducer. Of course, > with multiple sinks in my topology, I have to put all of the partitioning > logic inside a single class. > > What would you think about adding an overloaded TopologyBuilder.addSink(…) > method that takes a Partitioner (or better yet a smaller functional > interface). The resulting SinkProcessor could use that Partitioner instance > to set the partition number? That’d be super convenient for users, would > keep the logic where it belongs (where the topology defines the sinks), and > best of all the implementations won't have to worry about any other topics, > such as those used by stores, metrics, or other sinks. > > Best regards, > > Randall > > > On October 13, 2015 at 8:09:41 PM, Guozhang Wang (wangg...@gmail.com) > wrote: > > Hi Randall, > > You can try to set the partitioner class as > ProducerConfig.PARTITIONER_CLASS_CONFIG in the StreamsConfig, its > interface > can be found in > > org.apache.kafka.clients.producer.Partitioner > > Let me know if it works for you. > > Guozhang > > On Tue, Oct 13, 2015 at 10:59 AM, Randall Hauch wrote: > > > The new streams API added with KIP-28 is great. I’ve been using it on a > > prototype for a few weeks, and I’m looking forward to it being included > in > > 0.9.0. However, at the moment, a Processor implementation is not able to > > specify the partition number when it outputs messages. > > > > I’d be happy to log a JIRA and create a PR to add it to the API, but > > without knowing all of the history I’m wondering if leaving it out of > the > > API was intentional. > > > > Thoughts? > > > > Best regards, > > > > Randall Hauch > > > > > > -- > -- Guozhang > > -- -- Guozhang
Re: KIP-28 does not allow Processor to specify partition of output message
Ok, cool. I agree we want something simple. I'll create an issue and create a pull request with a proposal. Look for it tomorrow. > On Oct 13, 2015, at 10:25 PM, Guozhang Wangwrote: > > I see your point. Yeah I think it is a good way to add a Partitioner into > addSink(...) but the Partitioner interface in producer is a bit overkill: > > "partition(String topic, Object key, byte[] keyBytes, Object value, byte[] > valueBytes, Cluster cluster)" > > whereas for us we only want to partition on (K key, V value). > > Perhaps we should add a new Partitioner interface in Kafka Streams? > > Guozhang > >> On Tue, Oct 13, 2015 at 6:38 PM, Randall Hauch wrote: >> This overrides the partitioning logic for all topics, right? That means I >> have to explicitly call the default partitioning logic for all topics except >> those that my Producer forwards. I’m guess the best way to do by extending >> org.apache.kafka.clients.producer.DefaultProducer. Of course, with multiple >> sinks in my topology, I have to put all of the partitioning logic inside a >> single class. >> >> What would you think about adding an overloaded TopologyBuilder.addSink(…) >> method that takes a Partitioner (or better yet a smaller functional >> interface). The resulting SinkProcessor could use that Partitioner instance >> to set the partition number? That’d be super convenient for users, would >> keep the logic where it belongs (where the topology defines the sinks), and >> best of all the implementations won't have to worry about any other topics, >> such as those used by stores, metrics, or other sinks. >> >> Best regards, >> >> Randall >> >> >>> On October 13, 2015 at 8:09:41 PM, Guozhang Wang (wangg...@gmail.com) wrote: >>> >>> Hi Randall, >>> >>> You can try to set the partitioner class as >>> ProducerConfig.PARTITIONER_CLASS_CONFIG in the StreamsConfig, its interface >>> can be found in >>> >>> org.apache.kafka.clients.producer.Partitioner >>> >>> Let me know if it works for you. >>> >>> Guozhang >>> >>> On Tue, Oct 13, 2015 at 10:59 AM, Randall Hauch wrote: >>> >>> > The new streams API added with KIP-28 is great. I’ve been using it on a >>> > prototype for a few weeks, and I’m looking forward to it being included >>> > in >>> > 0.9.0. However, at the moment, a Processor implementation is not able to >>> > specify the partition number when it outputs messages. >>> > >>> > I’d be happy to log a JIRA and create a PR to add it to the API, but >>> > without knowing all of the history I’m wondering if leaving it out of the >>> > API was intentional. >>> > >>> > Thoughts? >>> > >>> > Best regards, >>> > >>> > Randall Hauch >>> > >>> >>> >>> >>> -- >>> -- Guozhang > > > > -- > -- Guozhang
Re: KIP-28 does not allow Processor to specify partition of output message
Hi Randall, You can try to set the partitioner class as ProducerConfig.PARTITIONER_CLASS_CONFIG in the StreamsConfig, its interface can be found in org.apache.kafka.clients.producer.Partitioner Let me know if it works for you. Guozhang On Tue, Oct 13, 2015 at 10:59 AM, Randall Hauchwrote: > The new streams API added with KIP-28 is great. I’ve been using it on a > prototype for a few weeks, and I’m looking forward to it being included in > 0.9.0. However, at the moment, a Processor implementation is not able to > specify the partition number when it outputs messages. > > I’d be happy to log a JIRA and create a PR to add it to the API, but > without knowing all of the history I’m wondering if leaving it out of the > API was intentional. > > Thoughts? > > Best regards, > > Randall Hauch > -- -- Guozhang
Re: KIP-28 does not allow Processor to specify partition of output message
This overrides the partitioning logic for all topics, right? That means I have to explicitly call the default partitioning logic for all topics except those that my Producer forwards. I’m guess the best way to do by extending org.apache.kafka.clients.producer.DefaultProducer. Of course, with multiple sinks in my topology, I have to put all of the partitioning logic inside a single class. What would you think about adding an overloaded TopologyBuilder.addSink(…) method that takes a Partitioner (or better yet a smaller functional interface). The resulting SinkProcessor could use that Partitioner instance to set the partition number? That’d be super convenient for users, would keep the logic where it belongs (where the topology defines the sinks), and best of all the implementations won't have to worry about any other topics, such as those used by stores, metrics, or other sinks. Best regards, Randall On October 13, 2015 at 8:09:41 PM, Guozhang Wang (wangg...@gmail.com) wrote: Hi Randall, You can try to set the partitioner class as ProducerConfig.PARTITIONER_CLASS_CONFIG in the StreamsConfig, its interface can be found in org.apache.kafka.clients.producer.Partitioner Let me know if it works for you. Guozhang On Tue, Oct 13, 2015 at 10:59 AM, Randall Hauchwrote: > The new streams API added with KIP-28 is great. I’ve been using it on a > prototype for a few weeks, and I’m looking forward to it being included in > 0.9.0. However, at the moment, a Processor implementation is not able to > specify the partition number when it outputs messages. > > I’d be happy to log a JIRA and create a PR to add it to the API, but > without knowing all of the history I’m wondering if leaving it out of the > API was intentional. > > Thoughts? > > Best regards, > > Randall Hauch > -- -- Guozhang