Need help regarding missing kafka producer metrics while upgrade from 1.1.0 to 2.0.0

2020-05-14 Thread Rajkumar Natarajan
Hi Kafka community,

we are currently using  kafka client version 1.1.0 in our production. I'm
working on upgrade it to kafka version 2.0.0.
I see there are some metrics which are present in version 1.1.1 are removed
in kafka version 2.0.0.

S# version 1.1.1 2.0.0
1
kafka.producer:type=ProducerRequestMetrics,name=ProducerRequestRateAndTimeMs

2 kafka.producer:type=ProducerRequestMetrics,name=ProducerRequestSize
3
kafka.producer:type=ProducerRequestMetrics,name=ProducerRequestThrottleRateAndTimeMs

4 kafka.producer:type=ProducerTopicMetrics,name=DroppedMessagesPerSec
5 kafka.producer.async:type=ProducerSendThread,name=ProducerQueueSize

Could you guys please help me the kafka metrics documentation link or KIP
documentation for the above specific producer metrics in kafka 2.0.0
versions.


Re: KafkaStream groupBy + count on KTable behaviour

2020-05-14 Thread Raffaele Esposito
Thanks a lot Bruno, much clearer now.
It's only my opinion but since the Topology is a concept of the API as
well as the repartitioning logic, for me also this mechanism should be a
bit more transparent, but it aslo maybe that I'm plain wrong here :)

Thanks !

On Thu, May 14, 2020 at 9:24 PM Bruno Cadonna  wrote:

> Hi Raffaele,
>
> Change is an internal class in Streams and also its SerDes are
> internal. To consume the repartition topic you mention outside of
> Streams you would need to use those internal classes (note: I've never
> tried this). Those classes can change at any time. So consuming from
> repartition topics for other than educational purposes is not a good
> idea.
>
> toStream() only emits the new value of the Change.
>
> Regarding docs, since these are internals, the code is the best
> source. For example:
>
> The Change class:
>
> https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java
>
> Here the Change class is used to first remove the old value from the
> aggregate and then to add the new value to the aggregate:
>
> https://github.com/apache/kafka/blob/873e9446ef8426061e2f1b6cd21815b270e27f03/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L90
>
> Best,
> Bruno
>
> On Thu, May 14, 2020 at 8:50 PM Raffaele Esposito
>  wrote:
> >
> > Hi Bruno,
> > Also when you mention:
> >
> > The record structure key, (oldValue, newValue) is called Change in
> > Kafka Streams and it is used where updates are emitted downstream
> >
> > Does it also mean the same happen when we convert a KTable to a KStream ?
> > Do you know any docs or article about this topics?
> >
> > Thanks again,
> > Raffaele
> >
> >
> >
> > On Thu, May 14, 2020 at 8:39 PM Raffaele Esposito <
> rafaelral...@gmail.com>
> > wrote:
> >
> > > Hi Bruno,
> > > Thanks,
> > > One more thing, As I told you I was consuming the repartitioning topic
> > > created by group by
> > > and I just saw the old and new value, as you are telling me now they
> are
> > > indeed marked as old and new,
> > > is this mark visible somehow consuming the repartitioning topic ?
> > > Raffaele
> > >
> > > On Thu, May 14, 2020 at 7:48 PM Bruno Cadonna 
> wrote:
> > >
> > >> Hi Raffaele,
> > >>
> > >> In your example, Kafka Streams would send the new and the old value
> > >> downstream. More specifically, the groupBy() would send (as you also
> > >> observed)
> > >>
> > >> London, (old value: London, new value: null)
> > >> Berlin, (old value: null, new value: Berlin)
> > >>
> > >> At the count() record London, (old value: London, new value: null)
> > >> would detract 1 from key London and record Berlin, (old value: null,
> > >> new value: Berlin) would add 1 to Berlin.
> > >>
> > >> The record structure key, (oldValue, newValue) is called Change in
> > >> Kafka Streams and it is used where updates are emitted downstream.
> > >>
> > >> Best,
> > >> Bruno
> > >>
> > >> On Thu, May 14, 2020 at 12:17 PM Raffaele Esposito
> > >>  wrote:
> > >> >
> > >> > I m trying to better understand KTable and I have encountered a
> > >> behaviour I
> > >> > cannot wrap my mind around it.
> > >> >
> > >> > So* groupByKey()* can only be applied to KStream and not to KTable,
> > >> that's
> > >> > because of the nature of KTable that and its UPSERT logic.
> > >> > What I don't understand correctly and therefore ask your help for
> that
> > >> is
> > >> > how *groupBy()* can actually be applied on KTable, the documentation
> > >> says
> > >> > that:
> > >> >
> > >> > groupBy() is a shorthand for selectKey(...).groupByKey()
> > >> >
> > >> > But both these operations can only be applied to KStreams.
> > >> >
> > >> > The documentation also says:
> > >> >
> > >> > Because a new key is selected, an internal repartitioning topic
> will be
> > >> > created in Kafka ... All data of this KTable will be redistributed
> > >> through
> > >> > the repartitioning topic by writing all update records to and
> rereading
> > >> all
> > >> > update records from it, such that the resulting KGroupedTable is
> > >> > partitioned on the new key.
> > >> >
> > >> > Now assume we want to count the favourite cities of users:
> > >> >
> > >> > We have a key table like:
> > >> >
> > >> > Mike,LondonAlice,Paris Fred,RomeMike,Berlin (changed his mind)
> > >> >
> > >> > I would use:
> > >> >
> > >> > KTable usersAndCitiesTable =
> > >> > builder.table("user-keys-and-cities");
> > >> >
> > >> >  KTable favouriteCities =
> > >> > usersAndCitiesTable.groupBy((user,city)->new KeyValue<>(city, city))
> > >> >  .count(Materialized. > >> > byte[]>>as("CountsByCity")
> > >> >
> > >> > I took a look at the repartitioning topic created because of the
> > >> groupBy,
> > >> > and can see the record mapped using the KeyValueMapper provided
> > >> >
> > >> > I noticed that table generates two entries when Mike changes his
> mind,
> > >> one
> > >> > for London (the old city) and one for Berlin (the new city)
> > >> >
> > 

Re: KafkaStream groupBy + count on KTable behaviour

2020-05-14 Thread Bruno Cadonna
Hi Raffaele,

Change is an internal class in Streams and also its SerDes are
internal. To consume the repartition topic you mention outside of
Streams you would need to use those internal classes (note: I've never
tried this). Those classes can change at any time. So consuming from
repartition topics for other than educational purposes is not a good
idea.

toStream() only emits the new value of the Change.

Regarding docs, since these are internals, the code is the best
source. For example:

The Change class:
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java

Here the Change class is used to first remove the old value from the
aggregate and then to add the new value to the aggregate:
https://github.com/apache/kafka/blob/873e9446ef8426061e2f1b6cd21815b270e27f03/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java#L90

Best,
Bruno

On Thu, May 14, 2020 at 8:50 PM Raffaele Esposito
 wrote:
>
> Hi Bruno,
> Also when you mention:
>
> The record structure key, (oldValue, newValue) is called Change in
> Kafka Streams and it is used where updates are emitted downstream
>
> Does it also mean the same happen when we convert a KTable to a KStream ?
> Do you know any docs or article about this topics?
>
> Thanks again,
> Raffaele
>
>
>
> On Thu, May 14, 2020 at 8:39 PM Raffaele Esposito 
> wrote:
>
> > Hi Bruno,
> > Thanks,
> > One more thing, As I told you I was consuming the repartitioning topic
> > created by group by
> > and I just saw the old and new value, as you are telling me now they are
> > indeed marked as old and new,
> > is this mark visible somehow consuming the repartitioning topic ?
> > Raffaele
> >
> > On Thu, May 14, 2020 at 7:48 PM Bruno Cadonna  wrote:
> >
> >> Hi Raffaele,
> >>
> >> In your example, Kafka Streams would send the new and the old value
> >> downstream. More specifically, the groupBy() would send (as you also
> >> observed)
> >>
> >> London, (old value: London, new value: null)
> >> Berlin, (old value: null, new value: Berlin)
> >>
> >> At the count() record London, (old value: London, new value: null)
> >> would detract 1 from key London and record Berlin, (old value: null,
> >> new value: Berlin) would add 1 to Berlin.
> >>
> >> The record structure key, (oldValue, newValue) is called Change in
> >> Kafka Streams and it is used where updates are emitted downstream.
> >>
> >> Best,
> >> Bruno
> >>
> >> On Thu, May 14, 2020 at 12:17 PM Raffaele Esposito
> >>  wrote:
> >> >
> >> > I m trying to better understand KTable and I have encountered a
> >> behaviour I
> >> > cannot wrap my mind around it.
> >> >
> >> > So* groupByKey()* can only be applied to KStream and not to KTable,
> >> that's
> >> > because of the nature of KTable that and its UPSERT logic.
> >> > What I don't understand correctly and therefore ask your help for that
> >> is
> >> > how *groupBy()* can actually be applied on KTable, the documentation
> >> says
> >> > that:
> >> >
> >> > groupBy() is a shorthand for selectKey(...).groupByKey()
> >> >
> >> > But both these operations can only be applied to KStreams.
> >> >
> >> > The documentation also says:
> >> >
> >> > Because a new key is selected, an internal repartitioning topic will be
> >> > created in Kafka ... All data of this KTable will be redistributed
> >> through
> >> > the repartitioning topic by writing all update records to and rereading
> >> all
> >> > update records from it, such that the resulting KGroupedTable is
> >> > partitioned on the new key.
> >> >
> >> > Now assume we want to count the favourite cities of users:
> >> >
> >> > We have a key table like:
> >> >
> >> > Mike,LondonAlice,Paris Fred,RomeMike,Berlin (changed his mind)
> >> >
> >> > I would use:
> >> >
> >> > KTable usersAndCitiesTable =
> >> > builder.table("user-keys-and-cities");
> >> >
> >> >  KTable favouriteCities =
> >> > usersAndCitiesTable.groupBy((user,city)->new KeyValue<>(city, city))
> >> >  .count(Materialized. >> > byte[]>>as("CountsByCity")
> >> >
> >> > I took a look at the repartitioning topic created because of the
> >> groupBy,
> >> > and can see the record mapped using the KeyValueMapper provided
> >> >
> >> > I noticed that table generates two entries when Mike changes his mind,
> >> one
> >> > for London (the old city) and one for Berlin (the new city)
> >> >
> >> > Are this entries marked somehow?  if yes, how ?
> >> >
> >> > How does Kafka make sure that on London count is applied a -1 and the
> >> > Berlin count a +1 when the new record with Mike's new favorite city
> >> arrives.
> >> >
> >> >
> >> > Any help or suggestion is highly appreciated !
> >> >
> >> > Thanks
> >>
> >


Re: KafkaStream groupBy + count on KTable behaviour

2020-05-14 Thread Raffaele Esposito
Hi Bruno,
Also when you mention:

The record structure key, (oldValue, newValue) is called Change in
Kafka Streams and it is used where updates are emitted downstream

Does it also mean the same happen when we convert a KTable to a KStream ?
Do you know any docs or article about this topics?

Thanks again,
Raffaele



On Thu, May 14, 2020 at 8:39 PM Raffaele Esposito 
wrote:

> Hi Bruno,
> Thanks,
> One more thing, As I told you I was consuming the repartitioning topic
> created by group by
> and I just saw the old and new value, as you are telling me now they are
> indeed marked as old and new,
> is this mark visible somehow consuming the repartitioning topic ?
> Raffaele
>
> On Thu, May 14, 2020 at 7:48 PM Bruno Cadonna  wrote:
>
>> Hi Raffaele,
>>
>> In your example, Kafka Streams would send the new and the old value
>> downstream. More specifically, the groupBy() would send (as you also
>> observed)
>>
>> London, (old value: London, new value: null)
>> Berlin, (old value: null, new value: Berlin)
>>
>> At the count() record London, (old value: London, new value: null)
>> would detract 1 from key London and record Berlin, (old value: null,
>> new value: Berlin) would add 1 to Berlin.
>>
>> The record structure key, (oldValue, newValue) is called Change in
>> Kafka Streams and it is used where updates are emitted downstream.
>>
>> Best,
>> Bruno
>>
>> On Thu, May 14, 2020 at 12:17 PM Raffaele Esposito
>>  wrote:
>> >
>> > I m trying to better understand KTable and I have encountered a
>> behaviour I
>> > cannot wrap my mind around it.
>> >
>> > So* groupByKey()* can only be applied to KStream and not to KTable,
>> that's
>> > because of the nature of KTable that and its UPSERT logic.
>> > What I don't understand correctly and therefore ask your help for that
>> is
>> > how *groupBy()* can actually be applied on KTable, the documentation
>> says
>> > that:
>> >
>> > groupBy() is a shorthand for selectKey(...).groupByKey()
>> >
>> > But both these operations can only be applied to KStreams.
>> >
>> > The documentation also says:
>> >
>> > Because a new key is selected, an internal repartitioning topic will be
>> > created in Kafka ... All data of this KTable will be redistributed
>> through
>> > the repartitioning topic by writing all update records to and rereading
>> all
>> > update records from it, such that the resulting KGroupedTable is
>> > partitioned on the new key.
>> >
>> > Now assume we want to count the favourite cities of users:
>> >
>> > We have a key table like:
>> >
>> > Mike,LondonAlice,Paris Fred,RomeMike,Berlin (changed his mind)
>> >
>> > I would use:
>> >
>> > KTable usersAndCitiesTable =
>> > builder.table("user-keys-and-cities");
>> >
>> >  KTable favouriteCities =
>> > usersAndCitiesTable.groupBy((user,city)->new KeyValue<>(city, city))
>> >  .count(Materialized.> > byte[]>>as("CountsByCity")
>> >
>> > I took a look at the repartitioning topic created because of the
>> groupBy,
>> > and can see the record mapped using the KeyValueMapper provided
>> >
>> > I noticed that table generates two entries when Mike changes his mind,
>> one
>> > for London (the old city) and one for Berlin (the new city)
>> >
>> > Are this entries marked somehow?  if yes, how ?
>> >
>> > How does Kafka make sure that on London count is applied a -1 and the
>> > Berlin count a +1 when the new record with Mike's new favorite city
>> arrives.
>> >
>> >
>> > Any help or suggestion is highly appreciated !
>> >
>> > Thanks
>>
>


Re: KafkaStream groupBy + count on KTable behaviour

2020-05-14 Thread Raffaele Esposito
Hi Bruno,
Thanks,
One more thing, As I told you I was consuming the repartitioning topic
created by group by
and I just saw the old and new value, as you are telling me now they are
indeed marked as old and new,
is this mark visible somehow consuming the repartitioning topic ?
Raffaele

On Thu, May 14, 2020 at 7:48 PM Bruno Cadonna  wrote:

> Hi Raffaele,
>
> In your example, Kafka Streams would send the new and the old value
> downstream. More specifically, the groupBy() would send (as you also
> observed)
>
> London, (old value: London, new value: null)
> Berlin, (old value: null, new value: Berlin)
>
> At the count() record London, (old value: London, new value: null)
> would detract 1 from key London and record Berlin, (old value: null,
> new value: Berlin) would add 1 to Berlin.
>
> The record structure key, (oldValue, newValue) is called Change in
> Kafka Streams and it is used where updates are emitted downstream.
>
> Best,
> Bruno
>
> On Thu, May 14, 2020 at 12:17 PM Raffaele Esposito
>  wrote:
> >
> > I m trying to better understand KTable and I have encountered a
> behaviour I
> > cannot wrap my mind around it.
> >
> > So* groupByKey()* can only be applied to KStream and not to KTable,
> that's
> > because of the nature of KTable that and its UPSERT logic.
> > What I don't understand correctly and therefore ask your help for that is
> > how *groupBy()* can actually be applied on KTable, the documentation says
> > that:
> >
> > groupBy() is a shorthand for selectKey(...).groupByKey()
> >
> > But both these operations can only be applied to KStreams.
> >
> > The documentation also says:
> >
> > Because a new key is selected, an internal repartitioning topic will be
> > created in Kafka ... All data of this KTable will be redistributed
> through
> > the repartitioning topic by writing all update records to and rereading
> all
> > update records from it, such that the resulting KGroupedTable is
> > partitioned on the new key.
> >
> > Now assume we want to count the favourite cities of users:
> >
> > We have a key table like:
> >
> > Mike,LondonAlice,Paris Fred,RomeMike,Berlin (changed his mind)
> >
> > I would use:
> >
> > KTable usersAndCitiesTable =
> > builder.table("user-keys-and-cities");
> >
> >  KTable favouriteCities =
> > usersAndCitiesTable.groupBy((user,city)->new KeyValue<>(city, city))
> >  .count(Materialized. > byte[]>>as("CountsByCity")
> >
> > I took a look at the repartitioning topic created because of the groupBy,
> > and can see the record mapped using the KeyValueMapper provided
> >
> > I noticed that table generates two entries when Mike changes his mind,
> one
> > for London (the old city) and one for Berlin (the new city)
> >
> > Are this entries marked somehow?  if yes, how ?
> >
> > How does Kafka make sure that on London count is applied a -1 and the
> > Berlin count a +1 when the new record with Mike's new favorite city
> arrives.
> >
> >
> > Any help or suggestion is highly appreciated !
> >
> > Thanks
>


Re: data structures used by GlobalKTable, KTable

2020-05-14 Thread Matthias J. Sax
Yeah, the current API doesn't make it very clear how to do it. You can
set an in-memory like this:

> builder.globalTable("topic", 
> Materialized.as(Stores.inMemoryKeyValueStore("store-name")));


We are already working on an improved API via KIP-591:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-591%3A+Add+Kafka+Streams+config+to+set+default+store+type



-Matthias


On 5/13/20 3:40 AM, Pushkar Deole wrote:
> Matthias,
> 
> For GlobalKTable, I am looking at the APIs provided by StreamsBuilder and I
> don't see any option to mention in-memory store there: all these API
> documentation states that  The resulting GlobalKTable
> 
> will
> be materialized in a local KeyValueStore
> 
> with
> an internal store name . It doesn't give an option whether in-memory or
> backed by DB
> 
> globalTable
> 
> (String
> 
>  topic)
> globalTable
> 
> (String
> 
>  topic, Consumed
> 
>  consumed, Materialized
> 
>  
> > materialized)
> 
> On Tue, May 12, 2020 at 11:07 PM Matthias J. Sax  wrote:
> 
>> By default, RocksDB is used. You can also change it to use an in-memory
>> store that is basically a HashMap.
>>
>>
>> -Matthias
>>
>> On 5/12/20 10:16 AM, Pushkar Deole wrote:
>>> Thanks Liam!
>>>
>>> On Tue, May 12, 2020, 15:12 Liam Clarke-Hutchinson <
>>> liam.cla...@adscale.co.nz> wrote:
>>>
 Hi Pushkar,

 GlobalKTables and KTables can have whatever data structure you like, if
>> you
 provide the appropriate deserializers - for example, an Kafka Streams
>> app I
 maintain stores model data (exported to a topic per entity from Postgres
 via Kafka Connect's JDBC Source) as a GlobalKTable of Jackson
>> ObjectNode's
 keyed by entity id

 If you're worried about efficiency, just treat KTables/GlobalKTables as
>> a
 HashMap to and you're pretty much there. In terms of efficiency,
 we're joining model  data to about 7 - 10 TB of transactional data a
>> day,
 and on average, run about 5 - 10 instances of our enrichment app with
>> about
 2GB max heap.

 Kind regards,

 Liam "Not a part of the Confluent team, but happy to help"
 Clarke-Hutchinson

 On Tue, May 12, 2020 at 9:35 PM Pushkar Deole 
 wrote:

> Hello confluent team,
>
> Could you provide some information on what data structures are used
> internally by GlobalKTable and KTables. The application that I am
>> working
> on has a requirement to read cached data from GlobalKTable on every
> incoming event, so the reads from GlobalKTable need to be efficient.
>

>>>
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: KafkaStream groupBy + count on KTable behaviour

2020-05-14 Thread Bruno Cadonna
Hi Raffaele,

In your example, Kafka Streams would send the new and the old value
downstream. More specifically, the groupBy() would send (as you also
observed)

London, (old value: London, new value: null)
Berlin, (old value: null, new value: Berlin)

At the count() record London, (old value: London, new value: null)
would detract 1 from key London and record Berlin, (old value: null,
new value: Berlin) would add 1 to Berlin.

The record structure key, (oldValue, newValue) is called Change in
Kafka Streams and it is used where updates are emitted downstream.

Best,
Bruno

On Thu, May 14, 2020 at 12:17 PM Raffaele Esposito
 wrote:
>
> I m trying to better understand KTable and I have encountered a behaviour I
> cannot wrap my mind around it.
>
> So* groupByKey()* can only be applied to KStream and not to KTable, that's
> because of the nature of KTable that and its UPSERT logic.
> What I don't understand correctly and therefore ask your help for that is
> how *groupBy()* can actually be applied on KTable, the documentation says
> that:
>
> groupBy() is a shorthand for selectKey(...).groupByKey()
>
> But both these operations can only be applied to KStreams.
>
> The documentation also says:
>
> Because a new key is selected, an internal repartitioning topic will be
> created in Kafka ... All data of this KTable will be redistributed through
> the repartitioning topic by writing all update records to and rereading all
> update records from it, such that the resulting KGroupedTable is
> partitioned on the new key.
>
> Now assume we want to count the favourite cities of users:
>
> We have a key table like:
>
> Mike,LondonAlice,Paris Fred,RomeMike,Berlin (changed his mind)
>
> I would use:
>
> KTable usersAndCitiesTable =
> builder.table("user-keys-and-cities");
>
>  KTable favouriteCities =
> usersAndCitiesTable.groupBy((user,city)->new KeyValue<>(city, city))
>  .count(Materialized. byte[]>>as("CountsByCity")
>
> I took a look at the repartitioning topic created because of the groupBy,
> and can see the record mapped using the KeyValueMapper provided
>
> I noticed that table generates two entries when Mike changes his mind, one
> for London (the old city) and one for Berlin (the new city)
>
> Are this entries marked somehow?  if yes, how ?
>
> How does Kafka make sure that on London count is applied a -1 and the
> Berlin count a +1 when the new record with Mike's new favorite city arrives.
>
>
> Any help or suggestion is highly appreciated !
>
> Thanks


InvalidRecordException: Inner record LegacyRecordBatch

2020-05-14 Thread Gérald Quintana
Hello,

Since we upgraded from Kafka 2.3 to 2.4 we have big amount logs telling:

[2020-05-14 19:34:27,623] ERROR [ReplicaManager broker=1] Error processing
append operation on partition ourtopic-1 (kafka.server.ReplicaManager)
org.apache.kafka.common.InvalidRecordException: Inner record
LegacyRecordBatch(offset=0, Record(magic=1, attributes=0, compression=NONE,
crc=1669125658, CreateTime=1589477470936, key=0 bytes, value=657 bytes))
inside the compressed record batch does not have incremental offsets,
expected offset is 1 in topic partition ourtopic-1.
[2020-05-14 19:34:27,623] ERROR [ReplicaManager broker=1] Error processing
append operation on partition ourtopic-7 (kafka.server.ReplicaManager)
org.apache.kafka.common.InvalidRecordException: Inner record
LegacyRecordBatch(offset=0, Record(magic=1, attributes=0, compression=NONE,
crc=2652410808, CreateTime=1589472087000, key=0 bytes, value=729 bytes))
inside the compressed record batch does not have incremental offsets,
expected offset is 1 in topic partition ourtopic-7.
[2020-05-14 19:34:27,624] ERROR [ReplicaManager broker=1] Error processing
append operation on partition ourtopic-4 (kafka.server.ReplicaManager)
org.apache.kafka.common.InvalidRecordException: Inner record
LegacyRecordBatch(offset=0, Record(magic=1, attributes=0, compression=NONE,
crc=188264511, CreateTime=1589472086000, key=0 bytes, value=730 bytes))
inside the compressed record batch does not have incremental offsets,
expected offset is 1 in topic partition ourtopic-4.

We found the origin in Kafka's source code, but we don't understand the
error message meaning.
Does it mean that very old Kafka clients are connected to our Kafka brokers?
How dangerous is it? What can we do to remove those logs?

Thanks for you help,
Gérald


ACL/SSL Issue with Configuration = ssl.principal.mapping.rules

2020-05-14 Thread Jonathan Goings
I'm working through a kafka implementation. I'm having issues with the 
ssl.principal.mapping.rules configuration.

I've successfully started up Kafka with the SSL encrypt/authenticate in place 
and I've successfully set up the super user using the full principal name. The 
issue arises when I flip on the ssl.principal.mapping rules. As soon as I 
implement this I can no longer access my topics as a super-user. When I disable 
the ssl.principal.mapping.rules and go back to the full principal name I cannot 
view my topics. Below is my config and error I'm seeing. This was working fine 
until I added the ssl.principal.mapping.rules=RULE:^CN=(.*?)$/$1/U,DEFAULT 
section. Yes, the cert cn is 'CN=TESTINSTANCE'

#CONFIG
broker.id=1

# SOCKET SERVER SETTINGS
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# TLS
listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
advertised.listeners=INTERNAL://kaf1:9092,EXTERNAL://kaf1pub:9093
listener.security.protocol.map=INTERNAL:SSL,EXTERNAL:SSL
inter.broker.listener.name=INTERNAL
ssl.endpoint.identification.algorithm=
ssl.client.auth=required
ssl.keystore.location=/directory/to/key.jks
ssl.keystore.password=
ssl.key.password=
ssl.truststore.location=/directory/to/trust.jks
ssl.truststore.password=

# LOG BASICS
log.dirs=/directory/to/log
num.partitions=1
num.recovery.threads.per.data.dir=1

# INTERNAL TOPIC SETTINGS
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
auto.create.topics.enable=false
delete.topic.enable=true

# LOG RETENTION POLICY
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=30

# TLS KAFKA to ZOOKEEPER
zookeeper.connect=testzoo:2182/chroot
zookeeper.connection.timeout.ms=6000
zookeeper.ssl.client.enable=true
zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
zookeeper.ssl.keystore.location=/directory/to/key.jks
zookeeper.ssl.keystore.password=
zookeeper.ssl.truststore.location=/directory/to/trust.jks
zookeeper.ssl.truststore.password=

# GROUP COORDINATOR SETTINGS
group.initial.rebalance.delay.ms=0

# ACL SETTINGS
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
# For name mapping on principal
super.users=User:TESTINSTANCE
allow.everyone.if.no.acl.found=false
ssl.principal.mapping.rules=RULE:^CN=(.*?)$/$1/U,DEFAULT

Error Msg:
Error while executing topic command : 
org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, 
deadlineMs=1589471487877) timed out at 1589471487878 after 1 attempt(s)
[2020-05-14 15:51:27,882] ERROR java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, 
deadlineMs=1589471487877) timed out at 1589471487878 after 1 attempt(s)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at 
kafka.admin.TopicCommand$AdminClientTopicService.getTopics(TopicCommand.scala:333)
at 
kafka.admin.TopicCommand$AdminClientTopicService.listTopics(TopicCommand.scala:252)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:66)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: 
Call(callName=listTopics, deadlineMs=1589471487877) timed out at 1589471487878 
after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting 
for a node assignment.
(kafka.admin.TopicCommand$)

Jonathan Goings | Database Administrator, Adv

NOTICE: This electronic mail message and any files transmitted with it are 
intended
exclusively for the individual or entity to which it is addressed. The message, 
together with any attachment, may contain confidential and/or privileged 
information.
Any unauthorized review, use, printing, saving, copying, disclosure or 
distribution 
is strictly prohibited. If you have received this message in error, please 
immediately advise the sender by reply email and delete all copies.


Re: JDBC source connector

2020-05-14 Thread Robin Moffatt
If you just want it once then delete the connector once it's processed all
the data


-- 

Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff


On Thu, 14 May 2020 at 16:14, vishnu murali 
wrote:

> Thanks Liam
>
> But I am asking like assume  I am having 10.
>
> Using JDBC source I need to push that once..
>
> No more additional data will be added in future in that table.
>
> In that case i need to push that only once not more than one...
>
> For this scenario I am asking!!
>
> On Thu, May 14, 2020, 19:20 Liam Clarke-Hutchinson <
> liam.cla...@adscale.co.nz> wrote:
>
> > Why not use autoincrement? It'll only emit new records on subsequent
> polls
> > then.
> >
> > On Thu, 14 May 2020, 11:15 pm vishnu murali,  >
> > wrote:
> >
> > > Hi Guys,
> > >
> > > I am using the mode *bulk  *and poll.interval.ms *10* in the
> Source
> > > connector configuration.
> > >
> > > But I don't need to load data another time.?
> > >
> > > I need to load the data only once ??
> > >
> > > How can I able to do this ?
> > >
> >
>


Re: JDBC source connector

2020-05-14 Thread vishnu murali
Thanks Liam

But I am asking like assume  I am having 10.

Using JDBC source I need to push that once..

No more additional data will be added in future in that table.

In that case i need to push that only once not more than one...

For this scenario I am asking!!

On Thu, May 14, 2020, 19:20 Liam Clarke-Hutchinson <
liam.cla...@adscale.co.nz> wrote:

> Why not use autoincrement? It'll only emit new records on subsequent polls
> then.
>
> On Thu, 14 May 2020, 11:15 pm vishnu murali, 
> wrote:
>
> > Hi Guys,
> >
> > I am using the mode *bulk  *and poll.interval.ms *10* in the Source
> > connector configuration.
> >
> > But I don't need to load data another time.?
> >
> > I need to load the data only once ??
> >
> > How can I able to do this ?
> >
>


Re: What to use to copy data from one kafka to another ?

2020-05-14 Thread JP MB
Yes, I saw that it would fit my use case but that brings another question.
Is MM1 deprecated or it will be discontinued ?

Regards

Em qui., 14 de mai. de 2020 às 14:46, Liam Clarke-Hutchinson <
liam.cla...@adscale.co.nz> escreveu:

> You could use Mirror Maker 1, it's a basic "consume from topic A in cluster
> X, produce to topic A in cluster Y" app.
>
> On Fri, 15 May 2020, 12:13 am JP MB,  wrote:
>
> > Hi guys,
> > My use case is simply copying data from one Kafka to another. When
> > searching on google, the immediate answer seems to be Mirror Maker, so we
> > jumped to the most recent version MM2.
> >
> > The thing is I don't want active/active replication, the consumers from
> > cluster A will be different from the consumers from cluster B. MM2
> creates
> > several topics in the source cluster(heartbeats, mm2-offsets, mm2-status,
> > mm2-configs), which is undesired for us because we don't want to pollute
> > the source.
> >
> > Is there a way to disable this? If no, what should I use?
> >
> > Regards,
> > José Brandão
> >
>


Re: JDBC source connector

2020-05-14 Thread Liam Clarke-Hutchinson
Why not use autoincrement? It'll only emit new records on subsequent polls
then.

On Thu, 14 May 2020, 11:15 pm vishnu murali, 
wrote:

> Hi Guys,
>
> I am using the mode *bulk  *and poll.interval.ms *10* in the Source
> connector configuration.
>
> But I don't need to load data another time.?
>
> I need to load the data only once ??
>
> How can I able to do this ?
>


Re: What to use to copy data from one kafka to another ?

2020-05-14 Thread Liam Clarke-Hutchinson
You could use Mirror Maker 1, it's a basic "consume from topic A in cluster
X, produce to topic A in cluster Y" app.

On Fri, 15 May 2020, 12:13 am JP MB,  wrote:

> Hi guys,
> My use case is simply copying data from one Kafka to another. When
> searching on google, the immediate answer seems to be Mirror Maker, so we
> jumped to the most recent version MM2.
>
> The thing is I don't want active/active replication, the consumers from
> cluster A will be different from the consumers from cluster B. MM2 creates
> several topics in the source cluster(heartbeats, mm2-offsets, mm2-status,
> mm2-configs), which is undesired for us because we don't want to pollute
> the source.
>
> Is there a way to disable this? If no, what should I use?
>
> Regards,
> José Brandão
>


Re: [External] kafka connector distributed mode balance stratety

2020-05-14 Thread Tauzell, Dave
When clients connect the Kafka broker assigns each client a subset of the topic 
partitions.   When a client becomes unresponsive then the broker assigns those 
partitions to another client.

The state of your connector task isn't going to be transferred but another 
connector task will eventually get message the other task (the one that fails) 
doesn't acknowledge..

-Dave

On 5/13/20, 10:42 PM, "wangl...@geekplus.com.cn"  
wrote:


I want to know how kafka connector under distributed mode balance its task?

For example I have  two connector instance: 192.168.10.2:8083, 
192.168.10.3:8083
If one killed, the task can be transfered to another automatically without 
any data loss?

When i use restful API curl "192.168.10.x:8083/.",  the two instance 
api is completely equivalent? Or i need I  a cname for this two instances?

Thanks,
Lei



wangl...@geekplus.com.cn

This e-mail and any files transmitted with it are confidential, may contain 
sensitive information, and are intended solely for the use of the individual or 
entity to whom they are addressed. If you have received this e-mail in error, 
please notify the sender by reply e-mail immediately and destroy all copies of 
the e-mail and any attachments.


What to use to copy data from one kafka to another ?

2020-05-14 Thread JP MB
Hi guys,
My use case is simply copying data from one Kafka to another. When
searching on google, the immediate answer seems to be Mirror Maker, so we
jumped to the most recent version MM2.

The thing is I don't want active/active replication, the consumers from
cluster A will be different from the consumers from cluster B. MM2 creates
several topics in the source cluster(heartbeats, mm2-offsets, mm2-status,
mm2-configs), which is undesired for us because we don't want to pollute
the source.

Is there a way to disable this? If no, what should I use?

Regards,
José Brandão


JDBC source connector

2020-05-14 Thread vishnu murali
Hi Guys,

I am using the mode *bulk  *and poll.interval.ms *10* in the Source
connector configuration.

But I don't need to load data another time.?

I need to load the data only once ??

How can I able to do this ?


KafkaStream groupBy + count on KTable behaviour

2020-05-14 Thread Raffaele Esposito
I m trying to better understand KTable and I have encountered a behaviour I
cannot wrap my mind around it.

So* groupByKey()* can only be applied to KStream and not to KTable, that's
because of the nature of KTable that and its UPSERT logic.
What I don't understand correctly and therefore ask your help for that is
how *groupBy()* can actually be applied on KTable, the documentation says
that:

groupBy() is a shorthand for selectKey(...).groupByKey()

But both these operations can only be applied to KStreams.

The documentation also says:

Because a new key is selected, an internal repartitioning topic will be
created in Kafka ... All data of this KTable will be redistributed through
the repartitioning topic by writing all update records to and rereading all
update records from it, such that the resulting KGroupedTable is
partitioned on the new key.

Now assume we want to count the favourite cities of users:

We have a key table like:

Mike,LondonAlice,Paris Fred,RomeMike,Berlin (changed his mind)

I would use:

KTable usersAndCitiesTable =
builder.table("user-keys-and-cities");

 KTable favouriteCities =
usersAndCitiesTable.groupBy((user,city)->new KeyValue<>(city, city))
 .count(Materialized.>as("CountsByCity")

I took a look at the repartitioning topic created because of the groupBy,
and can see the record mapped using the KeyValueMapper provided

I noticed that table generates two entries when Mike changes his mind, one
for London (the old city) and one for Berlin (the new city)

Are this entries marked somehow?  if yes, how ?

How does Kafka make sure that on London count is applied a -1 and the
Berlin count a +1 when the new record with Mike's new favorite city arrives.


Any help or suggestion is highly appreciated !

Thanks