Re: records with key as string and value as java ArrayList in topic

2020-05-11 Thread Pushkar Deole
And by the way, confluent has provided KafkaAvroSerializer/Deserialier. Can't they be used to do conversion for java types? On Tue, May 12, 2020 at 10:09 AM Pushkar Deole wrote: > Ok... so jackson json serialization is the way to go for hashmaps as well? > > On Mon, May 11, 2020 at 7:57 PM John

Re: records with key as string and value as java ArrayList in topic

2020-05-11 Thread Pushkar Deole
Ok... so jackson json serialization is the way to go for hashmaps as well? On Mon, May 11, 2020 at 7:57 PM John Roesler wrote: > Oh, my mistake. I thought this was a different thread :) > > You might want to check, but I don’t think there is a kip for a map serde. > Of course, you’re welcome to

Merging multiple streams into one

2020-05-11 Thread Alessandro Tagliapietra
Hello everyone, we currently use 3 streams (metrics, events, states) and I need to implement a keepalive mechanism so that if the machine doesn't send any data (from a specific list of variables) it'll emit a value that changes the machine state. For example, in machine 1 the list of keepalive

Offset Management...

2020-05-11 Thread Rajib Deb
Hi, I wanted to know if it is a good practice to develop a custom offset management method while consuming from Kafka. I am thinking to develop it as below. 1. Create a PartitionInfo named tuple as below PartitionInfo("PartitionInfo",["header","custom writer","offset"] 1. Then populate

Kafka metrics

2020-05-11 Thread Eleanore Jin
Hi community, I just wonder what is the difference between the consumer lag reported by Kafka client and the consumer lag reported by burrow? Thanks a lot! Eleanore

Re: how can I manually do a compaction?

2020-05-11 Thread Liam Clarke-Hutchinson
Hi Nicolae, Nope, the log cleaner is a thread that runs in the background periodically. Also be aware that it only compacts log segments that are no longer being written to - and the default size for a log segment and default log roll time are both very high, so you won't be seeing log compaction

Re: Kafka Streams, WindowBy, Grace Period, Late events, Suppres operation

2020-05-11 Thread Baki Hayat
Hello Scott, Thank you for your response, Actually i was not aware that i should window deserializer... in my code it was not asked me to add window deserializer. In which part should i provide deserializer ? I am just providing window size in my code. BR On 11 May 2020 Mon at 18:48 Scott

Re: Kafka Streams, WindowBy, Grace Period, Late events, Suppres operation

2020-05-11 Thread Baki Hayat
Hello John, Thank you for your response, I am using custom time extractor, in final stage i am persisting streamed data into timeseries database and when i did a double check from there, i confirmed that time calculation seems correct. How about warning message that i mentioned ? How can be

Kafka upgrade from 0.10 to 2.3.x

2020-05-11 Thread Praveen
Hi folks, I'd like to take downtime to upgrade to 2.3.x from 10.2.1. But I can't find it in the doc for 2.3.x upgrade that I can take downtime to do this. The instructions are for rolling upgrade only. Has anyone tried this? Praveen

how can I manually do a compaction?

2020-05-11 Thread Dumitru-Nicolae Marasoui
Hi, how can I manually do a compaction? (in a local laptop setup) Is there a command line for this, or certain broker settings to look for? Thank you, -- Thank you, Nicolae Marasoiu Scala Engineer Orion, OVO Group

Re: Kafka Streams, WindowBy, Grace Period, Late events, Suppres operation

2020-05-11 Thread Scott Reynolds
Baki, You can get this message "o.a.k.s.state.internals.WindowKeySchema : Warning: window end time was truncated to Long.MAX"" when your TimeWindowDeserializer is created without a windowSize. There are two constructors for a TimeWindowDeserializer, are you using the one with WindowSize?

Re: Kafka Streams, WindowBy, Grace Period, Late events, Suppres operation

2020-05-11 Thread John Roesler
Hello Baki, It looks like option 2 is really what you want. The purpose of the time window stores is to allow deleting old data when you need to group by a time dimension, which naturally results in an infinite key space. If you don’t want to wait for the final result, can you just not add

Re: How actually jdbc sink connetor run

2020-05-11 Thread Jun Wang
Hi Lei For my understanding, from the github page you provided. the kafka-connect-jdbc project is licensed under the Confluent Community License. The project has different support channels. "For more information, check

Re: records with key as string and value as java ArrayList in topic

2020-05-11 Thread John Roesler
Oh, my mistake. I thought this was a different thread :) You might want to check, but I don’t think there is a kip for a map serde. Of course, you’re welcome to start one. Thanks, John On Mon, May 11, 2020, at 09:14, John Roesler wrote: > Hi Pushkar, > > I don’t think there is. You’re

Re: records with key as string and value as java ArrayList in topic

2020-05-11 Thread John Roesler
Hi Pushkar, I don’t think there is. You’re welcome to start one if you think it would be a useful addition. Before worrying about it further, though, you might want to check the InMemoryKeyValueStore implementation, since my answer was from memory. Thanks, John On Mon, May 11, 2020, at

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-05-11 Thread John Roesler
Hi Pushkar, I’m glad you’ve been able to work through the issues. The GlobalKTable does store the data in memory (or on disk, depending how you configure it). I think the in-memory version uses a TreeMap, which is logarithmic time access. I think you’ll find it sufficiently fast regardless.

Re: How to handle RebalanceInProgressException?

2020-05-11 Thread Benoit Delbosc
On 29.04.20 09:18, Benoit Delbosc wrote: >> 2) So in Kafka 2.0 we introduced poll(Duration) which would practically be >> more strict in respecting the passed in timeout. It means, it could return >> while we are still in the middle of a rebalance. At the same time we >> deprecated the old

How can I create a KSQL-DB materialized view that captures the oldest values for each key?

2020-05-11 Thread Dumitru-Nicolae Marasoui
Hello kafka community, The users_table below does not have any records (its topic has no messages starting offset 0), although source topic input_topic has messages: CREATE STREAM USERS_STREAM( accountid STRING, migrationDate BIGINT, metadata STRING) WITH (KAFKA_TOPIC='input_topic',

Re: Kafka long running job consumer config best practices and what to do to avoid stuck consumer

2020-05-11 Thread Jason Turim
I am not sure off the top, but since the method is on the consumer my intuition is that it would pause all the partitions the consumer is reading from. I think the best thing to do is write a little test harness app to verify the behavior. On Mon, May 11, 2020 at 7:31 AM Ali Nazemian wrote: >

How to config multiple tables in kafka jdbc sink connector?

2020-05-11 Thread wangl...@geekplus.com.cn
As described here: https://docs.confluent.io/current/connect/kafka-connect-jdbc/sink-connector/sink_config_options.html connector.class=io.confluent.connect.jdbc.JdbcSinkConnector tasks.max=1 topics=orders connection.url=jdbc:xxx connection.user=xxx connection.password=xxx insert.mode=upsert

Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-05-11 Thread Pushkar Deole
John, I think I can get the cache structure modified to make use of GlobalKTable here so the data can be shared across. I could get information that the admin data will be uploaded well in advance before main events so the issue with 'missed joins' won't exists since by the time main events start

Re: Are there any plans for MM2 to expose functionality to mutate records?

2020-05-11 Thread Liam Clarke-Hutchinson
Oh... I just realised it's based on KC, so can just use SMTs. Sweet! Please disregard my last email. Kind regards, Liam Clarke-Hutchinson On Mon, May 11, 2020 at 10:57 PM Liam Clarke-Hutchinson < liam.cla...@adscale.co.nz> wrote: > As per the subject, I've been delving into MM2 and don't see a

Re: Kafka long running job consumer config best practices and what to do to avoid stuck consumer

2020-05-11 Thread Ali Nazemian
Hi Jason, Thank you for the message. It seems quite interesting. So something I am not sure about "pause" and "resume" is it works based on a partition allocation. What will happen if more partitions are assigned to a single consumer? For example, in the case where we have over-partition a Kafka

compacting to keep oldest values?

2020-05-11 Thread Dumitru-Nicolae Marasoui
Hello kafka community, As a context, I have to implement a query like select k, min(d) from topic group by k. I am wondering if there is a possibility for a compaction to keep the oldest values, not the newest ones. If this is possible I would create a new topic with this custom policy to retain

Are there any plans for MM2 to expose functionality to mutate records?

2020-05-11 Thread Liam Clarke-Hutchinson
As per the subject, I've been delving into MM2 and don't see a similar piece of functionality like the ability in MM1 to specify a message handler. Looks like it'd be reasonably straightforward to incorporate into the MirrorSourceTask, but I'm wondering if it's been discussed and discarded

How actually jdbc sink connetor run

2020-05-11 Thread wangl...@geekplus.com.cn
doc: https://docs.confluent.io/3.1.1/connect/connect-jdbc/docs/sink_connector.html github code: https://github.com/confluentinc/kafka-connect-jdbc I glance over the code. Seems the actually worker is JdbcSinkTask. After put(Collection records) It will generate sql and execute it I try

Re: Re: Write to database directly by referencing schema registry, no jdbc sink connector

2020-05-11 Thread wangl...@geekplus.com.cn
Hi robin, Seems i didn't make it clear. Actually we still use jdbc sink connector. But we want to use the JDBC Sink function in our own distributed platform intead of kafka connector I want to consolidate the code here: https://github.com/confluentinc/kafka-connect-jdbc/ Receive kafka

Re: JDBC SINK SCHEMA

2020-05-11 Thread Robin Moffatt
Schema Registry is available as part of Confluent Platform download ( https://www.confluent.io/download/), and install per https://docs.confluent.io/current/schema-registry/installation/index.html The difference is that you just run the Schema Registry part of the stack, and leave the other

Re: JDBC SINK SCHEMA

2020-05-11 Thread vishnu murali
Hi Robin Is it possible to integrate Apache Kafka with that confluent schema registry like u said ?? I don't know how to do,can u able to give any reference? On Mon, May 11, 2020, 14:09 Robin Moffatt wrote: > You can use Apache Kafka as you are currently using, and just deploy Schema >

Re: records with key as string and value as java ArrayList in topic

2020-05-11 Thread Pushkar Deole
John, is there KIP in progress for supporting Java HashMap also? On Sun, May 10, 2020, 00:47 John Roesler wrote: > Yes, that’s correct. It’s only for serializing the java type ‘byte[]’. > > On Thu, May 7, 2020, at 10:37, Pushkar Deole wrote: > > Thanks John... I got to finish the work in few

Re: Write to database directly by referencing schema registry, no jdbc sink connector

2020-05-11 Thread Robin Moffatt
> wirite to target database. I want to use self-written java code instead of kafka jdbc sink connector. Out of interest, why do you want to do this? Why not use the JDBC sink connector (or a fork of it if you need to amend its functionality)? -- Robin Moffatt | Senior Developer Advocate |

Re: JDBC SINK SCHEMA

2020-05-11 Thread Robin Moffatt
You can use Apache Kafka as you are currently using, and just deploy Schema Registry alongside it. -- Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff On Sat, 9 May 2020 at 02:16, Chris Toomey wrote: > You have to either 1) use one of the Confluent serializers > < >

Re: JDBC Sink Connector

2020-05-11 Thread Robin Moffatt
Schema Registry and its serde libraries are part of Confluent Platform, licensed under Confluent Community Licence ( https://www.confluent.io/confluent-community-license-faq/) -- Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff On Fri, 8 May 2020 at 13:39, vishnu

Re: Re: Write to database directly by referencing schema registry, no jdbc sink connector

2020-05-11 Thread wangl...@geekplus.com.cn
Hi Liam, I have consumed the avro record using the java code: for (final ConsumerRecord record : records) { final String key = record.key(); final GenericRecord value = record.value(); System.out.println(record.value().getSchema()); System.out.printf("key = %s, value = %s%n",