Re: Kafka - FindCoordinator error

2020-05-07 Thread Liam Clarke-Hutchinson
Hi Rajib,

We can't see the args you're passing the consumer, and the error message
indicates the consumer can't find the cluster.

Thanks,

Liam Clarke-Hutchinson

On Fri, 8 May 2020, 3:04 pm Rajib Deb,  wrote:

> I wanted to check if anyone has faced this issue
>
> Thanks
> Rajib
>
> From: Rajib Deb
> Sent: Sunday, May 3, 2020 9:51 AM
> To: users@kafka.apache.org
> Subject: Kafka - FindCoordinator error
>
> Hi
> I have written a Python consumer using confluent-kafka package. After few
> hours of running the consumer is dying with the below error
>
> cimpl.KafkaException:
> KafkaError{code=_TIMED_OUT,val=-185,str="FindCoordinator response
> error: Local: Timed out"}
>
> Can anyone please help me understand why this is happening
> **
> Below is a portion of the code
> **
> producer_conf = {
> 'bootstrap.servers': 'xxx',
> 'security.protocol': 'SASL_SSL',
> 'sasl.mechanisms': 'PLAIN',
> 'sasl.username': 'x',
> 'sasl.password': '',
> 'ssl.ca.location':'',
> 'ssl.certificate.location': '',
> 'queue.buffering.max.messages': 10,
> 'queue.buffering.max.ms' : 1000,
> 'batch.num.messages': 500
> }
>
> p = Producer(**producer_conf)
> target_topic='xx'
>
> c = Consumer(kwargs)
> source_topic=''
> c.subscribe([source_topic])
> while True:
>
> msg = c.poll(100) #I am consuming from a topic
>
> if msg is None:
> continue
> if msg.error():
> logging.error("error occurred during polling topic")
> logging.error(msg.error())
> raise KafkaException(msg.error())
> continue
>
> #logging.info("input msg form topic: ")
> #logging.info(msg.value())
> #msgDict = json.loads(msg.value())  # taking msg into dictionary
> try:
> p.produce(target_topic, msg.value(), callback=delivery_callback)
> #the message from the consumed topic is pushed to the target topic
> c.commit() #disabled auto commit, manually committing only when
> message pushed to the target topic
> except BufferError:
> sys.stderr.write('%% Local producer queue is full (%d messages
> awaiting delivery): try again\n' %
>  len(p))
> except Exception as e:
> print(e)
>
> p.poll(0)
> #sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
> p.flush()
>
> Thanks
> Rajib
>


Re: Is committing offset required for Consumer

2020-05-07 Thread Boyuan Zhang
Thanks! That helps a lot.

On Thu, May 7, 2020 at 8:10 PM Chris Toomey  wrote:

> Right -- offset storage is an optional feature with Kafka, you can always
> choose to not use it and instead manage offsets yourself.
>
>
> On Thu, May 7, 2020 at 8:07 PM Boyuan Zhang  wrote:
>
> > Thanks for the pointer! Does that mean I don't need to commit the offset
> > with managing partitions and offset manually?
> >
> > On Thu, May 7, 2020 at 8:02 PM Chris Toomey  wrote:
> >
> > > If you choose to manually assign topic partitions, then you won't be
> > using
> > > the group protocol to dynamically manage partition assignments and thus
> > > don't have a need to poll or heartbeat at any interval. See "Manual
> > > Partition Assignment" in
> > >
> > >
> >
> https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
> > >  .
> > >
> > > On Thu, May 7, 2020 at 3:52 PM Boyuan Zhang 
> wrote:
> > >
> > > > Hi team,
> > > >
> > > > I'm building an application which uses Kafka Consumer APIs to read
> > > messages
> > > > from topics. I plan to manually assign TopicPartitions to my consumer
> > and
> > > > seek a certain offset before starting to read. I'll also materialize
> > the
> > > > last read offset and reuse it when creating the consumer later.
> > > >
> > > > Within my usage, I' curious whether I need to commit offset
> > automatically
> > > > or manually. While going through the doc, it seems like committing
> > offset
> > > > is only important to dynamic assignment.
> > > >
> > > > Another question around manual assignment is, is it still true that I
> > > need
> > > > to call poll() continuously to keep the consumer in the group
> described
> > > as
> > > > below?
> > > >
> > > > > It is also possible that the consumer could encounter a "livelock"
> > > > > situation where it is continuing to send heartbeats, but no
> progress
> > is
> > > > > being made. To prevent the consumer from holding onto its
> partitions
> > > > > indefinitely in this case, we provide a liveness detection
> mechanism
> > > > using
> > > > > the max.poll.interval.ms setting. Basically if you don't call poll
> > at
> > > > > least as frequently as the configured max interval, then the client
> > > will
> > > > > proactively leave the group so that another consumer can take over
> > its
> > > > > partitions. When this happens, you may see an offset commit failure
> > (as
> > > > > indicated by a CommitFailedException
> > > > > <
> > > >
> > >
> >
> https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/CommitFailedException.html
> > > >
> > > > thrown
> > > > > from a call to commitSync()
> > > > > <
> > > >
> > >
> >
> https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync--
> > > > >).
> > > > > This is a safety mechanism which guarantees that only active
> members
> > of
> > > > the
> > > > > group are able to commit offsets. So to stay in the group, you must
> > > > > continue to call poll.
> > > >
> > > > What will happen to poll() with manual assignment if the consumer is
> > > > removed from the group?
> > > >
> > > > Thanks for your help!
> > > >
> > > > Boyuan
> > > >
> > >
> >
>


Re: Is committing offset required for Consumer

2020-05-07 Thread Chris Toomey
Right -- offset storage is an optional feature with Kafka, you can always
choose to not use it and instead manage offsets yourself.


On Thu, May 7, 2020 at 8:07 PM Boyuan Zhang  wrote:

> Thanks for the pointer! Does that mean I don't need to commit the offset
> with managing partitions and offset manually?
>
> On Thu, May 7, 2020 at 8:02 PM Chris Toomey  wrote:
>
> > If you choose to manually assign topic partitions, then you won't be
> using
> > the group protocol to dynamically manage partition assignments and thus
> > don't have a need to poll or heartbeat at any interval. See "Manual
> > Partition Assignment" in
> >
> >
> https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
> >  .
> >
> > On Thu, May 7, 2020 at 3:52 PM Boyuan Zhang  wrote:
> >
> > > Hi team,
> > >
> > > I'm building an application which uses Kafka Consumer APIs to read
> > messages
> > > from topics. I plan to manually assign TopicPartitions to my consumer
> and
> > > seek a certain offset before starting to read. I'll also materialize
> the
> > > last read offset and reuse it when creating the consumer later.
> > >
> > > Within my usage, I' curious whether I need to commit offset
> automatically
> > > or manually. While going through the doc, it seems like committing
> offset
> > > is only important to dynamic assignment.
> > >
> > > Another question around manual assignment is, is it still true that I
> > need
> > > to call poll() continuously to keep the consumer in the group described
> > as
> > > below?
> > >
> > > > It is also possible that the consumer could encounter a "livelock"
> > > > situation where it is continuing to send heartbeats, but no progress
> is
> > > > being made. To prevent the consumer from holding onto its partitions
> > > > indefinitely in this case, we provide a liveness detection mechanism
> > > using
> > > > the max.poll.interval.ms setting. Basically if you don't call poll
> at
> > > > least as frequently as the configured max interval, then the client
> > will
> > > > proactively leave the group so that another consumer can take over
> its
> > > > partitions. When this happens, you may see an offset commit failure
> (as
> > > > indicated by a CommitFailedException
> > > > <
> > >
> >
> https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/CommitFailedException.html
> > >
> > > thrown
> > > > from a call to commitSync()
> > > > <
> > >
> >
> https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync--
> > > >).
> > > > This is a safety mechanism which guarantees that only active members
> of
> > > the
> > > > group are able to commit offsets. So to stay in the group, you must
> > > > continue to call poll.
> > >
> > > What will happen to poll() with manual assignment if the consumer is
> > > removed from the group?
> > >
> > > Thanks for your help!
> > >
> > > Boyuan
> > >
> >
>


Re: Kafka long running job consumer config best practices and what to do to avoid stuck consumer

2020-05-07 Thread Chris Toomey
You really have to decide what behavior it is you want when one of your
consumers gets "stuck". If you don't like the way the group protocol
dynamically manages topic partition assignments or can't figure out an
appropriate set of configuration settings that achieve your goal, you can
always elect to not use the group protocol and instead manage topic
partition assignment yourself. As I just replied to another post, there's a
nice writeup of this under  "Manual Partition Assignment" in
https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
 .

Chris


On Thu, May 7, 2020 at 12:37 AM Ali Nazemian  wrote:

> To help understanding my case in more details, the error I can see
> constantly is the consumer losing heartbeat and hence apparently the group
> get rebalanced based on the log I can see from Kafka side:
>
> GroupCoordinator 11]: Member
> consumer-3-f46e14b4-5998-4083-b7ec-bed4e3f374eb in group foo has failed,
> removing it from the group
>
> Thanks,
> Ali
>
> On Thu, May 7, 2020 at 2:38 PM Ali Nazemian  wrote:
>
> > Hi,
> >
> > With the emerge of using Apache Kafka for event-driven architecture, one
> > thing that has become important is how to tune apache Kafka consumer to
> > manage long-running jobs. The main issue raises when we set a relatively
> > large value for "max.poll.interval.ms". Setting this value will, of
> > course, resolve the issue of repetitive rebalance, but creates another
> > operational issue. I am looking for some sort of golden strategy to deal
> > with long-running jobs with Apache Kafka.
> >
> > If the consumer hangs for whatever reason, there is no easy way of
> passing
> > that stage. It can easily block the pipeline, and you cannot do much
> about
> > it. Therefore, it came to my mind that I am probably missing something
> > here. What are the expectations? Is it not valid to use Apache Kafka for
> > long-live jobs? Are there any other parameters need to be set, and the
> > issue of a consumer being stuck is caused by misconfiguration?
> >
> > I can see there are a lot of the same issues have been raised regarding
> > "the consumer is stuck" and usually, the answer has been "yeah, that's
> > because you have a long-running job, etc.". I have seen different
> > suggestions:
> >
> > - Avoid using long-running jobs. Read the message, submit it into another
> > thread and let the consumer to pass. Obviously this can cause data loss
> and
> > it would be a difficult problem to handle. It might be better to avoid
> > using Kafka in the first place for these types of requests.
> >
> > - Avoid using apache Kafka for long-running requests
> >
> > - Workaround based approaches like if the consumer is blocked, try to use
> > another consumer group and set the offset to the current value for the
> new
> > consumer group, etc.
> >
> > There might be other suggestions I have missed here, but that is not the
> > point of this email. What I am looking for is what is the best practice
> for
> > dealing with long-running jobs with Apache Kafka. I cannot easily avoid
> > using Kafka because it plays a critical part in our application and data
> > pipeline. On the other side, we have had so many challenges to keep the
> > long-running jobs stable operationally. So I would appreciate it if
> someone
> > can help me to understand what approach can be taken to deal with these
> > jobs with Apache Kafka as a message broker.
> >
> > Thanks,
> > Ali
> >
>
>
> --
> A.Nazemian
>


RE: Kafka - FindCoordinator error

2020-05-07 Thread Rajib Deb
I wanted to check if anyone has faced this issue

Thanks
Rajib

From: Rajib Deb
Sent: Sunday, May 3, 2020 9:51 AM
To: users@kafka.apache.org
Subject: Kafka - FindCoordinator error

Hi
I have written a Python consumer using confluent-kafka package. After few hours 
of running the consumer is dying with the below error

cimpl.KafkaException:
KafkaError{code=_TIMED_OUT,val=-185,str="FindCoordinator response
error: Local: Timed out"}

Can anyone please help me understand why this is happening
**
Below is a portion of the code
**
producer_conf = {
'bootstrap.servers': 'xxx',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': 'x',
'sasl.password': '',
'ssl.ca.location':'',
'ssl.certificate.location': '',
'queue.buffering.max.messages': 10,
'queue.buffering.max.ms' : 1000,
'batch.num.messages': 500
}

p = Producer(**producer_conf)
target_topic='xx'

c = Consumer(kwargs)
source_topic=''
c.subscribe([source_topic])
while True:

msg = c.poll(100) #I am consuming from a topic

if msg is None:
continue
if msg.error():
logging.error("error occurred during polling topic")
logging.error(msg.error())
raise KafkaException(msg.error())
continue

#logging.info("input msg form topic: ")
#logging.info(msg.value())
#msgDict = json.loads(msg.value())  # taking msg into dictionary
try:
p.produce(target_topic, msg.value(), callback=delivery_callback) #the 
message from the consumed topic is pushed to the target topic
c.commit() #disabled auto commit, manually committing only when message 
pushed to the target topic
except BufferError:
sys.stderr.write('%% Local producer queue is full (%d messages awaiting 
delivery): try again\n' %
 len(p))
except Exception as e:
print(e)

p.poll(0)
#sys.stderr.write('%% Waiting for %d deliveries\n' % len(p))
p.flush()

Thanks
Rajib


Re: Is committing offset required for Consumer

2020-05-07 Thread Chris Toomey
If you choose to manually assign topic partitions, then you won't be using
the group protocol to dynamically manage partition assignments and thus
don't have a need to poll or heartbeat at any interval. See "Manual
Partition Assignment" in
https://kafka.apache.org/24/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
 .

On Thu, May 7, 2020 at 3:52 PM Boyuan Zhang  wrote:

> Hi team,
>
> I'm building an application which uses Kafka Consumer APIs to read messages
> from topics. I plan to manually assign TopicPartitions to my consumer and
> seek a certain offset before starting to read. I'll also materialize the
> last read offset and reuse it when creating the consumer later.
>
> Within my usage, I' curious whether I need to commit offset automatically
> or manually. While going through the doc, it seems like committing offset
> is only important to dynamic assignment.
>
> Another question around manual assignment is, is it still true that I need
> to call poll() continuously to keep the consumer in the group described as
> below?
>
> > It is also possible that the consumer could encounter a "livelock"
> > situation where it is continuing to send heartbeats, but no progress is
> > being made. To prevent the consumer from holding onto its partitions
> > indefinitely in this case, we provide a liveness detection mechanism
> using
> > the max.poll.interval.ms setting. Basically if you don't call poll at
> > least as frequently as the configured max interval, then the client will
> > proactively leave the group so that another consumer can take over its
> > partitions. When this happens, you may see an offset commit failure (as
> > indicated by a CommitFailedException
> > <
> https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/CommitFailedException.html>
> thrown
> > from a call to commitSync()
> > <
> https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#commitSync--
> >).
> > This is a safety mechanism which guarantees that only active members of
> the
> > group are able to commit offsets. So to stay in the group, you must
> > continue to call poll.
>
> What will happen to poll() with manual assignment if the consumer is
> removed from the group?
>
> Thanks for your help!
>
> Boyuan
>


Re: Kafka Connect SMT to insert key into message

2020-05-07 Thread Liam Clarke-Hutchinson
So looking at the code of InsertField, it looks like there isn't an obvious
way, unless there's some way to chaining SMTs to achieve it.

Question then is, is it worth adding it to the InsertField SMT? The change
looks reasonably straightforward, and I'm happy to do a PR if it fits with
the aims of the project.

My use case were records keyed on the topic by user id, and I wanted to
persist them into S3 with the user id as part of the one JSON object per
line data.

Thanks,

Liam Clarke-Hutchinson

On Fri, May 8, 2020 at 12:14 PM Liam Clarke-Hutchinson <
liam.cla...@adscale.co.nz> wrote:

> Hi all,
>
> I've been double checking the docs, and before I write a custom transform,
> am I correct that there's no supported way in the InsertField SMT to insert
> the key as a field of the value?
>
> Cheers,
>
> Liam Clarke-Hutchinson
>


Kafka Connect SMT to insert key into message

2020-05-07 Thread Liam Clarke-Hutchinson
Hi all,

I've been double checking the docs, and before I write a custom transform,
am I correct that there's no supported way in the InsertField SMT to insert
the key as a field of the value?

Cheers,

Liam Clarke-Hutchinson


Is committing offset required for Consumer

2020-05-07 Thread Boyuan Zhang
Hi team,

I'm building an application which uses Kafka Consumer APIs to read messages
from topics. I plan to manually assign TopicPartitions to my consumer and
seek a certain offset before starting to read. I'll also materialize the
last read offset and reuse it when creating the consumer later.

Within my usage, I' curious whether I need to commit offset automatically
or manually. While going through the doc, it seems like committing offset
is only important to dynamic assignment.

Another question around manual assignment is, is it still true that I need
to call poll() continuously to keep the consumer in the group described as
below?

> It is also possible that the consumer could encounter a "livelock"
> situation where it is continuing to send heartbeats, but no progress is
> being made. To prevent the consumer from holding onto its partitions
> indefinitely in this case, we provide a liveness detection mechanism using
> the max.poll.interval.ms setting. Basically if you don't call poll at
> least as frequently as the configured max interval, then the client will
> proactively leave the group so that another consumer can take over its
> partitions. When this happens, you may see an offset commit failure (as
> indicated by a CommitFailedException
> 
>  thrown
> from a call to commitSync()
> ).
> This is a safety mechanism which guarantees that only active members of the
> group are able to commit offsets. So to stay in the group, you must
> continue to call poll.

What will happen to poll() with manual assignment if the consumer is
removed from the group?

Thanks for your help!

Boyuan


Re: records with key as string and value as java ArrayList in topic

2020-05-07 Thread Pushkar Deole
Thanks John... I got to finish the work in few days so need to get it
quick, so looking for something ready. I will take a look at jackson json.

By the way, what is the byteArrayserializer? As the name suggests, it is
for byte arrays so won't work for java ArrayList, right?

On Thu, May 7, 2020 at 8:44 PM John Roesler  wrote:

> Hi Pushkar,
>
> If you’re not too concerned about compactness, I think Jackson json
> serialization is the easiest way to serialize complex types.
>
> There’s also a kip in progress to add a list serde. You might take a look
> at that proposal for ideas to write your own.
>
> Thanks,
> John
>
> On Thu, May 7, 2020, at 08:17, Nicolas Carlot wrote:
> > Won't say it's a good idea to use java serialized classes for messages,
> but
> > you should use a byteArraySerializer if you want to do such things
> >
> > Le jeu. 7 mai 2020 à 14:32, Pushkar Deole  a
> écrit :
> >
> > > Hi All,
> > >
> > > I have a requirement to store a record with key as java String and
> value as
> > > java's ArrayList in the kafka topic. Kafka has by default provided a
> > > StringSerializer and StringDeserializer, however for java ArrayList,
> how
> > > can get serializer. Do I need to write my own? Can someone share if
> someone
> > > already has written one?
> > >
> >
> >
> > --
> > *Nicolas Carlot*
> >
> > Lead dev
> > |  | nicolas.car...@chronopost.fr
> >
> >
> > *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La
> > nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris*
> >
> > [image: Logo Chronopost]
> > | chronopost.fr 
> > Suivez nous sur Facebook  et
> Twitter
> > .
> >
> > [image: DPD Group]
> >
>


Re: records with key as string and value as java ArrayList in topic

2020-05-07 Thread John Roesler
Hi Pushkar,

If you’re not too concerned about compactness, I think Jackson json 
serialization is the easiest way to serialize complex types. 

There’s also a kip in progress to add a list serde. You might take a look at 
that proposal for ideas to write your own. 

Thanks,
John

On Thu, May 7, 2020, at 08:17, Nicolas Carlot wrote:
> Won't say it's a good idea to use java serialized classes for messages, but
> you should use a byteArraySerializer if you want to do such things
> 
> Le jeu. 7 mai 2020 à 14:32, Pushkar Deole  a écrit :
> 
> > Hi All,
> >
> > I have a requirement to store a record with key as java String and value as
> > java's ArrayList in the kafka topic. Kafka has by default provided a
> > StringSerializer and StringDeserializer, however for java ArrayList, how
> > can get serializer. Do I need to write my own? Can someone share if someone
> > already has written one?
> >
> 
> 
> -- 
> *Nicolas Carlot*
> 
> Lead dev
> |  | nicolas.car...@chronopost.fr
> 
> 
> *Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La
> nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris*
> 
> [image: Logo Chronopost]
> | chronopost.fr 
> Suivez nous sur Facebook  et Twitter
> .
> 
> [image: DPD Group]
>


Re: can kafka state stores be used as a application level cache by application to modify it from outside the stream topology?

2020-05-07 Thread John Roesler
Hi Pushkar,

To answer your question about tuning the global store latency, I think the 
biggest impact thing you can do is to configure the consumer that loads the 
data for global stores. You can pass configs specifically to the global 
consumer with the prefix: “ global.consumer.”

Regarding the larger situation, it seems like the global table and a 
distributed cache would display the same basic behavior in terms of the 
potential for missed joins. Then, it probably makes sense to go for the option 
with fewer components to implement and maintain, which to me points to the 
global KTable. 

Since you can anticipate that missed joins can be a problem, you can build in 
some metrics and reporting for how many misses you actually observe, and 
potentially redesign the app if it’s actually a problem. 

I hope this helps!
-John 

On Tue, May 5, 2020, at 01:23, Pushkar Deole wrote:
> Thanks John... appreciate your inputs and suggestions. I have been assigned
> recently to this task (of persisting the cache) and haven't been involved
> in original design and architecture and agree with all the issues you have
> highlighted.
> However, at this point, i don't think the application can be converted to
> streams since the design is not flexible and it would require lot of
> rewrite of code plus subsequent testing.
> 
> My first thought was to use external database only,  preferably the
> distributed caching systems like Apache Ignite since it will have least
> impact on performance. Going to database for every event would impact the
> throughput a lot. Probably having distributed caching (key/value pairs)
> would have comparatively lesser impact.
> Second choice is to go for GlobalKTable however this needs to be done very
> carefully.
> 
> Thanks again!
> 
> On Mon, May 4, 2020 at 11:18 PM Pushkar Deole  wrote:
> 
> > Thanks John... what parameters would affect the latency in case
> > GlobalKTable will be used and is there any configurations that could be
> > tuned to minimize the latency of sync with input topic?
> >
> > On Mon, May 4, 2020 at 10:20 PM John Roesler  wrote:
> >
> >> Hello Pushkar,
> >>
> >> Yes, that’s correct. The operation you describe is currently not
> >> supported. If you want to keep the structure you described in place, I’d
> >> suggest using an external database for the admin objects. I’ll give another
> >> idea below.
> >>
> >> With your current architecture, I’m a little concerned about data races.
> >> From what I saw, nothing would prevent processing stream records with agent
> >> 10 before you process the admin record with agent 10. This problem will
> >> persist no matter where you locate the cache.
> >>
> >> GlobalKTable would no doubt make it worse, since it increases the latency
> >> before admin record 10 is queriable everywhere.
> >>
> >> I think you’ll want to make a call between architecture simplicity
> >> (remote cache or global KTable) vs the probability of missed joins.
> >>
> >> I think the “best” way to solve this problem (that comes to mind anyway)
> >> might be to
> >> 1. Repartition the stream to be co-partitioned with the admin records.
> >> 2. Do a local (not global) stream-table join
> >> 3. Enable task idling
> >>
> >> You can do the repartition today with a ‘map’ or ‘selectKey’ to make the
> >> agent Id the new key of the stream, and then use ‘through’, (where the
> >> intermediate topic has the same number of partitions as the admin topic) to
> >> do the repartitioning. In 2.6, there is a “repartition” operator that will
> >> make this easier.
> >>
> >> The repartition ensures that all stream records with agent id 10 will be
> >> processed by the same thread that processes the admin records with agent id
> >> 10, hence it will be able to find agent 10 in the local KTable store.
> >>
> >> Task idling will minimize your chances of missing any enrichments. When a
> >> task has two inputs (E.g., your repartitioned stream joining with the admin
> >> table), it makes Streams wait until both inputs are buffered before
> >> processing, so it can do a better job of processing in timestamp order.
> >>
> >> I hope this helps!
> >> -John
> >>
> >> On Mon, May 4, 2020, at 05:30, Pushkar Deole wrote:
> >> > If i understand correctly, Kafka is not designed to provide replicated
> >> > caching mechanism wherein the updates to cache will be synchronous
> >> across
> >> > multiple cache instances.
> >> >
> >> > On Sun, May 3, 2020 at 10:49 PM Pushkar Deole 
> >> wrote:
> >> >
> >> > > Thanks John.
> >> > >
> >> > > Actually, this is a normal consumer-producer application wherein
> >> there are
> >> > > 2 consumers (admin consumer and main consumer) consuming messages
> >> from 2
> >> > > different topics.
> >> > > One of the consumers consumes messages from a admin topic and
> >> populates
> >> > > data in a cache e.g. lets say agent with agent id 10 for which the
> >> first
> >> > > name and last name is received is populated in cache. When the other
> >> > > consumer consumes 

Re: Identity Mirroring

2020-05-07 Thread Ryanne Dolan
Hey Henry, this was done with MM1 at LinkedIn at one point, but it requires
support for shallow iteration in KafkaConsumer, which was removed from
Apache Kafka a long time ago. Given recent talk of breaking changes in
Kafka 3.0, this might be a good time to revisit this.

Ryanne


On Thu, May 7, 2020, 2:46 AM Henry Cai  wrote:

> I saw this feature mentioned in the cloudera blog post:
> https://blog.cloudera.com/a-look-inside-kafka-mirrormaker-2/
>
> High Throughput Identity Mirroring
> The batch does not need to be decompressed and compressed and deserialized
> and serialized if nothing had to be changed.  Identity mirroring can have a
> much higher throughput than the traditional approach. This is another
> feature that will be coming soon in MM2.
>
> Is this feature already implemented in MM2?  Is there a KIP or JIRA
> associated with the feature?
>


Re: records with key as string and value as java ArrayList in topic

2020-05-07 Thread Nicolas Carlot
Won't say it's a good idea to use java serialized classes for messages, but
you should use a byteArraySerializer if you want to do such things

Le jeu. 7 mai 2020 à 14:32, Pushkar Deole  a écrit :

> Hi All,
>
> I have a requirement to store a record with key as java String and value as
> java's ArrayList in the kafka topic. Kafka has by default provided a
> StringSerializer and StringDeserializer, however for java ArrayList, how
> can get serializer. Do I need to write my own? Can someone share if someone
> already has written one?
>


-- 
*Nicolas Carlot*

Lead dev
|  | nicolas.car...@chronopost.fr


*Veuillez noter qu'à partir du 20 mai, le siège Chronopost déménage. La
nouvelle adresse est : 3 boulevard Romain Rolland 75014 Paris*

[image: Logo Chronopost]
| chronopost.fr 
Suivez nous sur Facebook  et Twitter
.

[image: DPD Group]


records with key as string and value as java ArrayList in topic

2020-05-07 Thread Pushkar Deole
Hi All,

I have a requirement to store a record with key as java String and value as
java's ArrayList in the kafka topic. Kafka has by default provided a
StringSerializer and StringDeserializer, however for java ArrayList, how
can get serializer. Do I need to write my own? Can someone share if someone
already has written one?


Re: JDBC Sink Connector

2020-05-07 Thread Robin Moffatt
If you don't want to send the schema each time then serialise your data
using Avro (or Protobuf), and then the schema is held in the Schema
Registry. See https://www.youtube.com/watch?v=b-3qN_tlYR4=981s

If you want to update a record insert of insert, you can use the upsert
mode. See https://www.youtube.com/watch?v=b-3qN_tlYR4=627s


-- 

Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff


On Thu, 7 May 2020 at 06:48, vishnu murali 
wrote:

> Hey Guys,
>
> i am working on JDBC Sink Conneector to take data from kafka topic to
> mysql.
>
> i am having 2 questions.
>
> i am using normal Apache Kafka 2.5 not a confluent version.
>
> 1)For inserting data every time we need to add the schema data also with
> every data,How can i overcome this situation?i want to give only the data.
>
> 2)In certain time i need to update the existing record without adding as a
> new record.How can i achieve this?
>


Re: JDBC Sink Connector

2020-05-07 Thread Liam Clarke-Hutchinson
Hi Vishnu,

I wrote an implementation of org.apache.kafka.connect.storage.Converter,
included it in the KC worker classpath (then set it with the property
value.converter) to provide the schema that the JDBC sink needs.

That approach may work for 1).

For 2) KC can use upsert if your DB supports it, based on the PK you
configure. But I've found in the past that it's not possible to reference
values already in the DB, so if key X had count = 5 in the DB already, and
the JDBC sink had a record with key X and count = 10, then it'll overwrite
instead of accumulating, so after the update count in the DB will be 10,
not 15.


Kind regards,

Liam Clarke-Hutchinson
On Thu, 7 May 2020, 5:48 pm vishnu murali, 
wrote:

> Hey Guys,
>
> i am working on JDBC Sink Conneector to take data from kafka topic to
> mysql.
>
> i am having 2 questions.
>
> i am using normal Apache Kafka 2.5 not a confluent version.
>
> 1)For inserting data every time we need to add the schema data also with
> every data,How can i overcome this situation?i want to give only the data.
>
> 2)In certain time i need to update the existing record without adding as a
> new record.How can i achieve this?
>


Identity Mirroring

2020-05-07 Thread Henry Cai
I saw this feature mentioned in the cloudera blog post:
https://blog.cloudera.com/a-look-inside-kafka-mirrormaker-2/

High Throughput Identity Mirroring
The batch does not need to be decompressed and compressed and deserialized
and serialized if nothing had to be changed.  Identity mirroring can have a
much higher throughput than the traditional approach. This is another
feature that will be coming soon in MM2.

Is this feature already implemented in MM2?  Is there a KIP or JIRA
associated with the feature?


Re: Kafka long running job consumer config best practices and what to do to avoid stuck consumer

2020-05-07 Thread Ali Nazemian
To help understanding my case in more details, the error I can see
constantly is the consumer losing heartbeat and hence apparently the group
get rebalanced based on the log I can see from Kafka side:

GroupCoordinator 11]: Member
consumer-3-f46e14b4-5998-4083-b7ec-bed4e3f374eb in group foo has failed,
removing it from the group

Thanks,
Ali

On Thu, May 7, 2020 at 2:38 PM Ali Nazemian  wrote:

> Hi,
>
> With the emerge of using Apache Kafka for event-driven architecture, one
> thing that has become important is how to tune apache Kafka consumer to
> manage long-running jobs. The main issue raises when we set a relatively
> large value for "max.poll.interval.ms". Setting this value will, of
> course, resolve the issue of repetitive rebalance, but creates another
> operational issue. I am looking for some sort of golden strategy to deal
> with long-running jobs with Apache Kafka.
>
> If the consumer hangs for whatever reason, there is no easy way of passing
> that stage. It can easily block the pipeline, and you cannot do much about
> it. Therefore, it came to my mind that I am probably missing something
> here. What are the expectations? Is it not valid to use Apache Kafka for
> long-live jobs? Are there any other parameters need to be set, and the
> issue of a consumer being stuck is caused by misconfiguration?
>
> I can see there are a lot of the same issues have been raised regarding
> "the consumer is stuck" and usually, the answer has been "yeah, that's
> because you have a long-running job, etc.". I have seen different
> suggestions:
>
> - Avoid using long-running jobs. Read the message, submit it into another
> thread and let the consumer to pass. Obviously this can cause data loss and
> it would be a difficult problem to handle. It might be better to avoid
> using Kafka in the first place for these types of requests.
>
> - Avoid using apache Kafka for long-running requests
>
> - Workaround based approaches like if the consumer is blocked, try to use
> another consumer group and set the offset to the current value for the new
> consumer group, etc.
>
> There might be other suggestions I have missed here, but that is not the
> point of this email. What I am looking for is what is the best practice for
> dealing with long-running jobs with Apache Kafka. I cannot easily avoid
> using Kafka because it plays a critical part in our application and data
> pipeline. On the other side, we have had so many challenges to keep the
> long-running jobs stable operationally. So I would appreciate it if someone
> can help me to understand what approach can be taken to deal with these
> jobs with Apache Kafka as a message broker.
>
> Thanks,
> Ali
>


-- 
A.Nazemian


Re: Kafka consumer

2020-05-07 Thread vishnu murali
Thanks Chris

But it won't work,I tried that also.

I found solution

That @KafkaListener default behavior it self is to  take one by one data
only..





On Thu, May 7, 2020, 11:28 Chris Toomey  wrote:

> You can set the max.poll.records config. setting to 1 in order to pull down
> and process 1 record at a time.
>
> See https://kafka.apache.org/documentation/#consumerconfigs .
>
> On Mon, May 4, 2020 at 1:04 AM vishnu murali 
> wrote:
>
> > Hey Guys,
> >
> > I am having a topic and in that topic I am having 3000 messages
> >
> > In my springboot application I want to consume the data using
> > @KafkaListener()  and also one by one because  I need to do some tedious
> > process on that Data it may take some time
> >
> > So within this time  I don't need to consume another data.
> >
> > After the process is finished only I need to consume the data from the
> > topic .?
> >
> > How can I do this?
> >
> > Any ideas?
> >
>