[jira] [Commented] (KAFKA-9909) Kafka Streams : offset control to Streams API

2020-05-13 Thread Gopikrishna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17106198#comment-17106198
 ] 

Gopikrishna commented on KAFKA-9909:


Appreciate [~guozhang] your response. There are two cases we are talking about.
 # ill-formatted message: this can be handled very easily and pretty straight 
forward. my example code just shown to replicate the issue but not having the 
intention. 
 # Message received correctly, its good. but the application could not process 
it due to various other dependencies on other DB/microservices. will retry 
multiple times and also will process later by storing the partition and offset 
details. that is the process i follow. 

 

But checkpointing will help the second scenario as mentioned above. the code 
shared will illustrate the first scenario but for second scenario also similar 
to that as i am unable to the message currently, hence i want to have a 
checkpoint to revisit later. 

yes i agree, we can resend the message again if it is not processed, but 
ideally it increases the count, and will be difficult to know the accurate 
number of messages processed perfectly. Hope this is clear. 

 

> Kafka Streams : offset control to Streams API
> -
>
> Key: KAFKA-9909
> URL: https://issues.apache.org/jira/browse/KAFKA-9909
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
> Environment: All
>Reporter: Gopikrishna
>Priority: Minor
>  Labels: Offset, commit
>
> Hello team, really inspired the way streams api is running today. I would 
> like to have a feature to be flexible regarding the offset. when we write the 
> processor api, processor context object can be used to commit the offset. 
> this is not effective. but streams are controlling the offset. the moment the 
> process method executed or scheduled window completed, the offset is 
> committed automatically by streams internally. 
> Like traditional kafka consumer, its better the context object should have 
> complete control over the offset whether to commit or not. This will give 
> more control to the api to handle failovers and especially when message 
> cannot be processed, context should not commit the offset. Appreciate this 
> can be implemented. 
>  
> h4. enable.auto.commit is by default false, but streams are committing 
> automatically the offset. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9953) support multiple consumerGroupCoordinators in TransactionManager

2020-05-13 Thread Joost van de Wijgerd (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17106190#comment-17106190
 ] 

Joost van de Wijgerd commented on KAFKA-9953:
-

Hi [~guozhang] thanks for the information, Indeed I see that the SerDes pass in 
the topic so this can indeed be used to select the proper deserializer in that 
case. I guess the typing on the Consumer and ConsumerRecords interface would 
have to be Object though as I am now mixing diffent java class types.  

The other situation (b) is a bit more problematic for us. We have a constant 
stream of DomainEvents which are processed exactly once. If we would like to 
process an event from that stream (one that was already there but we weren't 
interested before) then rewinding the existing consumer would be hard to do 
(not impossible of course) and it would block processing of newer events until 
we have caught up again. In this scenario, adding a new consumer(group) would 
be much simpler to implement (at the expense of network traffic of course, but 
for us this is not an issue).

Arguably this could all be solved with a single consumer -> producer mapping 
but unfortunately this is not the route we took. Since we are processing 
financial transactions with this system I am very wary of rewriting our 
consumer logic as we would have to drop our current consumer groups and create 
a new one (for the whole service) and if we are not careful we might miss 
transactions . Another solution would be to pair a producer to each consumer 
but that will add extra overhead as well. 

Maybe our use case / implementation is a bit out of the ordinary but fact is 
that the kafka-clients library allows this construct and it also works. Except 
for the fact that a latency of around 105ms is introduced with the standards 
settings because the TransactionManager has to keep refreshing the 
groupCoordinator Node. So best practices notwithstanding I think it makes sense 
for the kafka-clients library to support many-to-one consumer -> producer 
mappings in a performant way. If you decide not to, then maybe add at least a 
warn log when this situation is detected inside the TransactionManager to alert 
developers they are doing something wrong that will hurt the performance of 
their app.

Obviously my preference is to have this fix in the kafka-clients library ;)

> support multiple consumerGroupCoordinators in TransactionManager
> 
>
> Key: KAFKA-9953
> URL: https://issues.apache.org/jira/browse/KAFKA-9953
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Joost van de Wijgerd
>Priority: Major
> Attachments: KAFKA-9953.patch
>
>
> We are using kafka with a transactional producer and have the following use 
> case:
> 3 KafkaConsumers (each with their own ConsumerGroup) polled by the same 
> thread and 1 transactional kafka producer. When we add the offsets to the 
> transaction we run into the following problem: 
> TransactionManager only keeps track of 1 consumerGroupCoordinator, however it 
> can be that some consumerGroupCoordinators are on another node, now we 
> constantly see the TransactionManager switching between nodes, this has 
> overhead of 1 failing _TxnOffsetCommitRequest_ and 1 unnecessary 
> _FindCoordinatorRequest_.
> Also with  _retry.backoff.ms_ set to 100 by default this is causing a pause 
> of 100ms for every other transaction (depending on what KafkaConsumer 
> triggered the transaction of course)
> If the TransactionManager could keep track of coordinator nodes per 
> consumerGroupId this problem would be solved. 
> I have already a patch for this but still need to test it. Will add it to the 
> ticket when that is done



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9914) Mirror Maker 2 creates heartbeats kafka topics recursively

2020-05-13 Thread victor (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17106186#comment-17106186
 ] 

victor commented on KAFKA-9914:
---

Do you name kafka for both clusters at the same time?

As follow:

mm2-1.properties
{color:#FF}kafka.bootstrap.servers{color} = aa.com:9092

mm2-2.properties
{color:#FF}kafka.bootstrap.servers{color} = bb.com:9092

> Mirror Maker 2 creates heartbeats kafka topics recursively
> --
>
> Key: KAFKA-9914
> URL: https://issues.apache.org/jira/browse/KAFKA-9914
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.0.0
>Reporter: azher khan
>Priority: Major
>
> Hi Team,
> I configured Mirror Maker 2 to run on a Pod configuring the Kafka brokers in 
> our environments marking '10.X.X.YY' Kafka broker as primary and '10.X.Y.XY' 
> as backup.
> +*Mirror Maker mm2.properties*+ 
> {code:java}
> clusters=kafka,backup
> kafka.bootstrap.servers=10.X.X.YY:9092
> backup.bootstrap.servers=10.X.Y.XY:9092
> kafka->backup.enabled=true
> kafka->backup.topics=az.*
> backup->kafka.enabled=false
> sync.topic.acls.enabled=true
> {code}
>  
> I was able to run Mirror Maker 2 successfully and was also able to take 
> backup of the identified topics (starting with 'az').
>  
> However I could see many kafka topics with the suffix 'heartbeats' created in 
> recursion (See below the list of topics with 'kafka.*.heartbeats'). This 
> could be because i triggered the 'connect-mirror-maker.sh' several times to 
> test backing up other topics
>  
> I have 2 queries
>  # How to avoid having 'kafka.*.heartbeats' topic being created using Mirror 
> Maker 2
>  # Once Mirror Maker 2 has backed up a topic say 'azherf1test' . What is the 
> best way to rollback changes made by Mirror Maker (ie delete all the topics 
> created by Mirror Maker 'kafka.azherf1test' and supporting topics) ensuring 
> the stability of existing/source topics and Kafka broker.
> We are testing Mirror Maker and want to ensure we are able to roll back the 
> changes without affecting the Kafka topics/brokers)
> +*Kafka Topics list output:*+
> {code:java}
> azherf1test
> heartbeats
> kafka-client-topic
> mm2-configs.backup.internal
> mm2-configs.kafka.internal
> mm2-offset-syncs.backup.internal
> mm2-offsets.backup.internal
> mm2-offsets.kafka.internal
> mm2-status.backup.internal
> mm2-status.kafka.internal
> kafka.azherf1test
> kafka.checkpoints.internal
> kafka.heartbeats
> kafka.kafka.heartbeats
> kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.kafka.heartbeats
> 

[jira] [Commented] (KAFKA-9459) MM2 sync topic config does not work

2020-05-13 Thread victor (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17106182#comment-17106182
 ] 

victor commented on KAFKA-9459:
---

https://issues.apache.org/jira/browse/KAFKA-9981

> MM2 sync topic config does not work
> ---
>
> Key: KAFKA-9459
> URL: https://issues.apache.org/jira/browse/KAFKA-9459
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Badai Aqrandista
>Priority: Major
>
> I have MM2 configured as follow:
> {code:java}
> {
> "name": "mm2-from-1-to-2",
> "config": {
>   
> "connector.class":"org.apache.kafka.connect.mirror.MirrorSourceConnector",
>   "topics":"foo",
>   "key.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter",
>   "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter",
>   "sync.topic.configs.enabled":"true",
>   "sync.topic.configs.interval.seconds": 60,
>   "sync.topic.acls.enabled": "false",
>   "replication.factor": 1,
>   "offset-syncs.topic.replication.factor": 1,
>   "heartbeats.topic.replication.factor": 1,
>   "checkpoints.topic.replication.factor": 1,
>   "target.cluster.alias":"dest",
>   "target.cluster.bootstrap.servers":"dest.example.com:9092",
>   "source.cluster.alias":"src",
>   "source.cluster.bootstrap.servers":"src.example.com:9092",
>   "tasks.max": 1}
> }
> {code}
> Topic "foo" is configured with "cleanup.policy=compact". But after waiting 
> for 15 minutes, I still don't see "src.foo" in the destination cluster has 
> "cleanup.policy=compact".
> I had the connect node to run in TRACE level and I could not find any calls 
> to describeConfigs 
> (https://github.com/apache/kafka/blob/2.4.0/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L327).
>  This implies it never actually get a list of topics that it needs to get 
> topic configs from.
> And I am suspecting this code always return empty Set 
> (https://github.com/apache/kafka/blob/2.4.0/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L214-L220):
> {code:java}
> private Set topicsBeingReplicated() {
> return knownTopicPartitions.stream()
> .map(x -> x.topic())
> .distinct()
> .filter(x -> knownTargetTopics.contains(formatRemoteTopic(x)))
> .collect(Collectors.toSet());
> }
> {code}
> knownTopicPartitions contains topic-partitions from the source cluster.
> knownTargetTopics contains topic-partitions from the target cluster, whose 
> topic names contain source alias already.
> So, why is topicsBeingReplicated (list of topic-partitions from source 
> cluster) being filtered using knownTopicPartitions (list of topic-partitions 
> from target cluster)?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9827) MM2 doesnt replicate data

2020-05-13 Thread victor (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9827?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17106178#comment-17106178
 ] 

victor commented on KAFKA-9827:
---

1.Is running dedicated mm2 cluster with more than one nodes?

2.Whether a topic for unsynchronized data is created after the mm cluster is 
started?

> MM2 doesnt replicate data
> -
>
> Key: KAFKA-9827
> URL: https://issues.apache.org/jira/browse/KAFKA-9827
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0, 2.4.1
>Reporter: Dmitry
>Priority: Major
> Attachments: mm2.log
>
>
> I have 2 servers with different node count:
>  # 2.4.0 version, 3 nodes, broker port plaintext:9090
>  # 2.4.0 version, 1 nodes, broker port plaintext:
> I want to transfer data from cluster 1 -> cluster 2 by mirror maker 2.
> MM2 configuration:
>  
> {code:java}
> name = mm2-backupFlow
> topics = .*
> groups = .*
> # specify any number of cluster aliases
> clusters = m1-source, m1-backup
> # connection information for each cluster
> m1-source.bootstrap.servers = 172.17.165.49:9090, 172.17.165.50:9090, 
> 172.17.165.51:9090
> m1-backup.bootstrap.servers = 172.17.165.52:
> # enable and configure individual replication flows
> m1-source->m1-backup.enabled = true
> m1-soruce->m1-backup.emit.heartbeats.enabled = false
> {code}
>  
> When i start MM2 on each clusters created 3 topics:
>  * heartbeat
>  * *.internal
> And thats all. Nothing to happend.
> I watching next ERROR's:
> {code:java}
> [2020-04-07 07:19:16,821] ERROR Plugin class loader for connector: 
> 'org.apache.kafka.connect.mirror.MirrorHeartbeatConnector' was not found. 
> Returning: 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@1e5f4170 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165)
> {code}
> {code:java}
> [2020-04-07 07:19:18,312] ERROR Plugin class loader for connector: 
> 'org.apache.kafka.connect.mirror.MirrorCheckpointConnector' was not found. 
> Returning: 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@1e5f4170 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165)
> {code}
> {code:java}
> [2020-04-07 07:19:16,807] ERROR Plugin class loader for connector: 
> 'org.apache.kafka.connect.mirror.MirrorSourceConnector' was not found. 
> Returning: 
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@1e5f4170 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165){code}
> Full log in attach
> What i missed ? Help please



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9986) Checkpointing API for State Stores

2020-05-13 Thread Nikolay Izhikov (Jira)
Nikolay Izhikov created KAFKA-9986:
--

 Summary: Checkpointing API for State Stores
 Key: KAFKA-9986
 URL: https://issues.apache.org/jira/browse/KAFKA-9986
 Project: Kafka
  Issue Type: New Feature
Reporter: Nikolay Izhikov


The parent ticket is KAFKA-3184.

The goal of this ticket is to provide a general checkpointing API for state 
stores in Streams (not only for in-memory but also for persistent stores), 
where the checkpoint location can be either local disks or remote storage. 

Design scope is primarily on:

  # the API design for both checkpointing as well as loading checkpoints into 
the local state stores
  # the mechanism of the checkpointing, e.g. whether it should be async? 
whether it should be executed on separate threads? etc. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-3184) Add Checkpoint for In-memory State Store

2020-05-13 Thread Nikolay Izhikov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3184?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17106046#comment-17106046
 ] 

Nikolay Izhikov commented on KAFKA-3184:


[~guozhang] Thanks for the feedback and detailed explanation. 
I'm very interested and will create a separate ticket for Checkpointing API.

> Add Checkpoint for In-memory State Store
> 
>
> Key: KAFKA-3184
> URL: https://issues.apache.org/jira/browse/KAFKA-3184
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Nikolay Izhikov
>Priority: Major
>  Labels: user-experience
>
> Currently Kafka Streams does not make a checkpoint of the persistent state 
> store upon committing, which would be expensive since it is "stopping the 
> world" and write on disks: for example, RocksDB would require you to copy the 
> file directory to make a copy naively. 
> However, for in-memory stores checkpointing maybe doable in an asynchronous 
> manner hence it can be done quickly. And the benefit of having intermediate 
> checkpoint is to avoid restoring from scratch if standby tasks are not 
> present.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


<    1   2