Re: Kafka stream error about message format
Hi Shengyi, 1) Unfortunately no, see the related docs: https://kafka.apache. org/0110/documentation.html#upgrade_11_exactly_once_semantics 2) You cannot set the internal topic message format on the client side (producer, consumer, streams, ..) it is decided on the broker side only. 3) You can read more about the upgrade path on https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-MessageFormat (section Compatibility, Deprecation, and Migration Plan) Guozhang On Wed, Apr 18, 2018 at 8:05 PM, sy.panwrote: > Hi, All: > > I have upgraded my Kafka cluster from 0.10.2 to 1.1 recently. After > rolling upgrade, the broker version related configuration is : > > inter.broker.protocol.version = 1.1 > log.message.format.version = 0.10.2 > > I keep the log message format as low version because not all clients could > upgrade in a short time. > When i test Kafka stream EOS feature, get the ERROR log: > > [ERROR][StreamThread]: stream-thread [xxx-b0b080fb-StreamThread-2] Failed > to close task manager due to the following error: > org.apache.kafka.common.KafkaException: Cannot execute transactional > method because we are in an error state > at org.apache.kafka.clients.producer.internals.TransactionManager. > maybeFailWithError(TransactionManager.java:784) > at org.apache.kafka.clients.producer.internals. > TransactionManager.beginAbort(TransactionManager.java:229) > at org.apache.kafka.clients.producer.KafkaProducer. > abortTransaction(KafkaProducer.java:660) > at org.apache.kafka.streams.processor.internals. > StreamTask.closeSuspended(StreamTask.java:486) > at org.apache.kafka.streams.processor.internals. > StreamTask.close(StreamTask.java:546) > at org.apache.kafka.streams.processor.internals. > AssignedTasks.close(AssignedTasks.java:405) > at org.apache.kafka.streams.processor.internals. > TaskManager.shutdown(TaskManager.java:260) > at org.apache.kafka.streams.processor.internals. > StreamThread.completeShutdown(StreamThread.java:1107) > at org.apache.kafka.streams.processor.internals. > StreamThread.run(StreamThread.java:731) > Caused by: > org.apache.kafka.common.errors.UnsupportedForMessageFormatException: > The message format version on the broker does not support the request. > > So my question is : > > 1) Can I use Kafka stream EOS when the log.message.format.version < 0.11 > > 2) How to set internal topic message format(created by Kafka stream) ? I > couldn’t find in Kafka Streams Configs > > 3) What the performance impact if set log.message.format.version = 1.1 > when some clients’ version is still 0.10.2 > (trigger message format transforming on broker side?) > > > > Thank you! -- -- Guozhang
Re: KIP-226 - Dynamic Broker Configuration
Hi Rajini 1. Oh so truststores can't be be updated dynamically ? Is it planned for any future release? 2. By dynamically updated, do you mean that if Broker was using keystore A, we can now point it to use a different keystore B ? Thanks. On Wed, Apr 18, 2018 at 10:51 PM, Darshanwrote: > Hi > > KIP-226 is released in 1.1. I had a questions about it. > > If we add a new certificate (programmatically) in the truststore that > Kafka Broker is using it, do we need to issue any CLI or other command for > Kafka broker to read the new certificate or with KIP-226 everything happens > automatically ? > > Thanks. > > >
Re: Is KTable cleaned up automatically in a Kafka streams application?
> From your answer I understand that whenever a product is >> deleted a message by the Kafka Streams application needs to be consumed and >> that specific entry for that product needs to be deleted from the KTable >> with aggregated data using tombstone. Exactly. > If I don't do that the entry will >> never be deleted and will stay in the KTable. Is this correct? That's correct. Note, for windowed KTables this would not be required, because there a retention time applies. But it seems you are using non-windowed KTables and thus you need to "clean up" manually. -Matthias On 4/19/18 3:37 PM, Mihaela Stoycheva wrote: > I will try to clarify what I mean by "old state that is not longer needed". > Lets say I consume messages about products that has been sold to customers > and I keep a KTable with aggregated data for product with a specific id and > the number of times it has been bought. At some point a product that has > been bought many times is no longer available and will never be again - > let's day it's deleted. Then this aggregated data about it is old and no > longer needed. From your answer I understand that whenever a product is > deleted a message by the Kafka Streams application needs to be consumed and > that specific entry for that product needs to be deleted from the KTable > with aggregated data using tombstone. If I don't do that the entry will > never be deleted and will stay in the KTable. Is this correct? > > Thanks, > Mihaela Stoycheva > > On Thu, Apr 19, 2018 at 3:12 PM, Matthias J. Sax> wrote: > >> Not sure what you mean by "old state that is not longer needed" ? >> >> key-value entries are kept forever, and there is no TTL. If you want to >> delete something from the store, you can return `null` as aggregation >> result though. >> >> -Matthias >> >> On 4/19/18 2:28 PM, adrien ruffie wrote: >>> Hi Mihaela, >>> >>> >>> by default a KTable already have a log compacted behavior. >>> >>> therefore you don't need to manually clean up. >>> >>> >>> Best regards, >>> >>> >>> Adrien >>> >>> >>> De : Mihaela Stoycheva >>> Envoyé : jeudi 19 avril 2018 13:41:22 >>> À : users@kafka.apache.org >>> Objet : Is KTable cleaned up automatically in a Kafka streams >> application? >>> >>> Hello, >>> >>> I have a Kafka Streams application that is consuming from two topics and >>> internally aggregating, transforming and joining data. I am using KTable >> as >>> result of aggregation and my question is if KTables are cleaned using >> some >>> mechanism of Kafka Streams or is this something that I have to do >> manually >>> - clean up old state that is not longer needed? >>> >>> Regards, >>> Mihaela Stoycheva >>> >> >> > signature.asc Description: OpenPGP digital signature
Re: KIP-226 - Dynamic Broker Configuration
Hi Darshan, We currently allow only keystores to be dynamically updated. And you need to use kaka-configs.sh to update the keystore config. See https://kafka.apache.org/documentation/#dynamicbrokerconfigs. On Thu, Apr 19, 2018 at 6:51 AM, Darshanwrote: > Hi > > KIP-226 is released in 1.1. I had a questions about it. > > If we add a new certificate (programmatically) in the truststore that > Kafka Broker is using it, do we need to issue any CLI or other command for > Kafka broker to read the new certificate or with KIP-226 everything happens > automatically ? > > Thanks. > > >
Re: Is KTable cleaned up automatically in a Kafka streams application?
I will try to clarify what I mean by "old state that is not longer needed". Lets say I consume messages about products that has been sold to customers and I keep a KTable with aggregated data for product with a specific id and the number of times it has been bought. At some point a product that has been bought many times is no longer available and will never be again - let's day it's deleted. Then this aggregated data about it is old and no longer needed. From your answer I understand that whenever a product is deleted a message by the Kafka Streams application needs to be consumed and that specific entry for that product needs to be deleted from the KTable with aggregated data using tombstone. If I don't do that the entry will never be deleted and will stay in the KTable. Is this correct? Thanks, Mihaela Stoycheva On Thu, Apr 19, 2018 at 3:12 PM, Matthias J. Saxwrote: > Not sure what you mean by "old state that is not longer needed" ? > > key-value entries are kept forever, and there is no TTL. If you want to > delete something from the store, you can return `null` as aggregation > result though. > > -Matthias > > On 4/19/18 2:28 PM, adrien ruffie wrote: > > Hi Mihaela, > > > > > > by default a KTable already have a log compacted behavior. > > > > therefore you don't need to manually clean up. > > > > > > Best regards, > > > > > > Adrien > > > > > > De : Mihaela Stoycheva > > Envoyé : jeudi 19 avril 2018 13:41:22 > > À : users@kafka.apache.org > > Objet : Is KTable cleaned up automatically in a Kafka streams > application? > > > > Hello, > > > > I have a Kafka Streams application that is consuming from two topics and > > internally aggregating, transforming and joining data. I am using KTable > as > > result of aggregation and my question is if KTables are cleaned using > some > > mechanism of Kafka Streams or is this something that I have to do > manually > > - clean up old state that is not longer needed? > > > > Regards, > > Mihaela Stoycheva > > > >
Re: Consuming messages from topics based on keys
I think this can be done in two ways. 1. Kstream or Ktable filter in a topology. 2. Store data in a persistent store elsewhere and expose via API (like Cassandra) -- Rahul Singh rahul.si...@anant.us Anant Corporation On Apr 19, 2018, 7:07 AM -0500, joe_delbri...@denso-diam.com, wrote: > I am trying to determine how our consumer will send data to a rest API. > The topics are machine based topics meaning they store all the information > about a specific machine in one topic. I then have keys that are > identifying the type of information stored. Here are some examples: > > Topic: E347-8 Key: machine-fault > Topic: E347-8 Key: good-part-count > > Currently in the rest end point I am running the key through an if/else > and the deciding how to handle it. Is there a better way to handle this? > Can I have my consumer listen for a topic / key pair? I was thinking I > could layout my consumers to be code specific like this example > > Consumer 1: > Topic: E347-8 Key: machine_fault > Topic: 186_7 Key: machine_fault > > Consumer 2: > Topic: E347-8 Key: good-part-count > Topic: 186_7 Key: good-part-count > > > Thanks, > > > > > > > > > The information contained in and transmitted with this Email may be > privileged, proprietary, confidential and protected from disclosure. No > privilege is hereby intended to be waived. This Email is intended only for > the person to whom it is addressed. If you are not the intended > recipient/addressee, any use of the Email and/or its contents, including, > but not limited to, dissemination, distribution or copying is strictly > prohibited and may be unlawful, and you must not take any action in > reliance on it. If you receive this Email in error, please immediately > notify the sender and delete the original message and any copies of it > from your computer system. We deny any liability for damages resulting > from the use of this Email by the unintended recipient, including the > recipient in error. >
Re: Is KTable cleaned up automatically in a Kafka streams application?
Not sure what you mean by "old state that is not longer needed" ? key-value entries are kept forever, and there is no TTL. If you want to delete something from the store, you can return `null` as aggregation result though. -Matthias On 4/19/18 2:28 PM, adrien ruffie wrote: > Hi Mihaela, > > > by default a KTable already have a log compacted behavior. > > therefore you don't need to manually clean up. > > > Best regards, > > > Adrien > > > De : Mihaela Stoycheva> Envoyé : jeudi 19 avril 2018 13:41:22 > À : users@kafka.apache.org > Objet : Is KTable cleaned up automatically in a Kafka streams application? > > Hello, > > I have a Kafka Streams application that is consuming from two topics and > internally aggregating, transforming and joining data. I am using KTable as > result of aggregation and my question is if KTables are cleaned using some > mechanism of Kafka Streams or is this something that I have to do manually > - clean up old state that is not longer needed? > > Regards, > Mihaela Stoycheva > signature.asc Description: OpenPGP digital signature
Consuming messages from topics based on keys
I am trying to determine how our consumer will send data to a rest API. The topics are machine based topics meaning they store all the information about a specific machine in one topic. I then have keys that are identifying the type of information stored. Here are some examples: Topic: E347-8 Key: machine-fault Topic: E347-8 Key: good-part-count Currently in the rest end point I am running the key through an if/else and the deciding how to handle it. Is there a better way to handle this? Can I have my consumer listen for a topic / key pair? I was thinking I could layout my consumers to be code specific like this example Consumer 1: Topic: E347-8 Key: machine_fault Topic: 186_7 Key: machine_fault Consumer 2: Topic: E347-8 Key: good-part-count Topic: 186_7 Key: good-part-count Thanks, The information contained in and transmitted with this Email may be privileged, proprietary, confidential and protected from disclosure. No privilege is hereby intended to be waived. This Email is intended only for the person to whom it is addressed. If you are not the intended recipient/addressee, any use of the Email and/or its contents, including, but not limited to, dissemination, distribution or copying is strictly prohibited and may be unlawful, and you must not take any action in reliance on it. If you receive this Email in error, please immediately notify the sender and delete the original message and any copies of it from your computer system. We deny any liability for damages resulting from the use of this Email by the unintended recipient, including the recipient in error.
RE: Is KTable cleaned up automatically in a Kafka streams application?
Hi Mihaela, by default a KTable already have a log compacted behavior. therefore you don't need to manually clean up. Best regards, Adrien De : Mihaela StoychevaEnvoyé : jeudi 19 avril 2018 13:41:22 À : users@kafka.apache.org Objet : Is KTable cleaned up automatically in a Kafka streams application? Hello, I have a Kafka Streams application that is consuming from two topics and internally aggregating, transforming and joining data. I am using KTable as result of aggregation and my question is if KTables are cleaned using some mechanism of Kafka Streams or is this something that I have to do manually - clean up old state that is not longer needed? Regards, Mihaela Stoycheva
Is KTable cleaned up automatically in a Kafka streams application?
Hello, I have a Kafka Streams application that is consuming from two topics and internally aggregating, transforming and joining data. I am using KTable as result of aggregation and my question is if KTables are cleaned using some mechanism of Kafka Streams or is this something that I have to do manually - clean up old state that is not longer needed? Regards, Mihaela Stoycheva