[jira] [Commented] (ATLAS-3305) Unable to scale atlas kafka consumers
[ https://issues.apache.org/jira/browse/ATLAS-3305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16986690#comment-16986690 ] Adam Rempter commented on ATLAS-3305: - [~madhan], [~sarath], could you guys have a look at patch I attached and suggest if we need any more improvements to it? By default patch sets one consumer per topic - so the same like it is at the moment. Maybe add more clear docs like [~bolke] suggested to ensure that admin is aware of how atlas will process hook messages? Now Atlas supports multiple topics, so it could be another addition to improve performance of atlas consumer. Btw I think it relaxes consistency also... And consistency can be guaranteed on producer side, eg: [https://www.javaworld.com/article/3066873/big-data-messaging-with-kafka-part-2.html] Thanks, Adam > Unable to scale atlas kafka consumers > - > > Key: ATLAS-3305 > URL: https://issues.apache.org/jira/browse/ATLAS-3305 > Project: Atlas > Issue Type: Bug > Components: atlas-core, atlas-intg >Affects Versions: 1.1.0, 2.0.0 >Reporter: Adam Rempter >Priority: Major > Labels: performance > Attachments: ATLAS-3305_multiple_kafka_consumers.patch, > multiple_consumers_perf.png > > Time Spent: 10m > Remaining Estimate: 0h > > We wanted to scale kafka consumers for atlas, as we are getting many lineage > messages and processing them just with one consumer is not enough. > > There is parameter atlas.notification.hook.numthreads to scale consumers in > NotificationHookConsumer. > But the method: > > notificationInterface.createConsumers(NotificationType.HOOK, numThreads) > > is always returning one element list, which effectively always starts one > consumer > List> consumers = > Collections.singletonList(kafkaConsumer); > > Log incorrectly says that nuber of consumers has been created: > LOG.info("<== KafkaNotification.createConsumers(notificationType={}, > numConsumers={}, autoCommitEnabled={})", notificationType, numConsumers, > autoCommitEnabled) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (ATLAS-3305) Unable to scale atlas kafka consumers
[ https://issues.apache.org/jira/browse/ATLAS-3305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16986051#comment-16986051 ] Bolke de Bruin commented on ATLAS-3305: --- I suggest using a different topic for this (ATLAS_PARTIONED) to ensure administrators know what to do with this (making sure that the messages are atomic and/or partioned by key). > Unable to scale atlas kafka consumers > - > > Key: ATLAS-3305 > URL: https://issues.apache.org/jira/browse/ATLAS-3305 > Project: Atlas > Issue Type: Bug > Components: atlas-core, atlas-intg >Affects Versions: 1.1.0, 2.0.0 >Reporter: Adam Rempter >Priority: Major > Labels: performance > Attachments: ATLAS-3305_multiple_kafka_consumers.patch, > multiple_consumers_perf.png > > Time Spent: 10m > Remaining Estimate: 0h > > We wanted to scale kafka consumers for atlas, as we are getting many lineage > messages and processing them just with one consumer is not enough. > > There is parameter atlas.notification.hook.numthreads to scale consumers in > NotificationHookConsumer. > But the method: > > notificationInterface.createConsumers(NotificationType.HOOK, numThreads) > > is always returning one element list, which effectively always starts one > consumer > List> consumers = > Collections.singletonList(kafkaConsumer); > > Log incorrectly says that nuber of consumers has been created: > LOG.info("<== KafkaNotification.createConsumers(notificationType={}, > numConsumers={}, autoCommitEnabled={})", notificationType, numConsumers, > autoCommitEnabled) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (ATLAS-3305) Unable to scale atlas kafka consumers
[ https://issues.apache.org/jira/browse/ATLAS-3305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16985964#comment-16985964 ] Adam Rempter commented on ATLAS-3305: - Updated patch with latest master. In our case such change really allowed atlas to swallow growing changes delta. Please see grafana screenshot for details. !multiple_consumers_perf.png! > Unable to scale atlas kafka consumers > - > > Key: ATLAS-3305 > URL: https://issues.apache.org/jira/browse/ATLAS-3305 > Project: Atlas > Issue Type: Bug > Components: atlas-core, atlas-intg >Affects Versions: 1.1.0, 2.0.0 >Reporter: Adam Rempter >Priority: Major > Labels: performance > Attachments: ATLAS-3305_multiple_kafka_consumers.patch, > multiple_consumers_perf.png > > Time Spent: 10m > Remaining Estimate: 0h > > We wanted to scale kafka consumers for atlas, as we are getting many lineage > messages and processing them just with one consumer is not enough. > > There is parameter atlas.notification.hook.numthreads to scale consumers in > NotificationHookConsumer. > But the method: > > notificationInterface.createConsumers(NotificationType.HOOK, numThreads) > > is always returning one element list, which effectively always starts one > consumer > List> consumers = > Collections.singletonList(kafkaConsumer); > > Log incorrectly says that nuber of consumers has been created: > LOG.info("<== KafkaNotification.createConsumers(notificationType={}, > numConsumers={}, autoCommitEnabled={})", notificationType, numConsumers, > autoCommitEnabled) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (ATLAS-3305) Unable to scale atlas kafka consumers
[ https://issues.apache.org/jira/browse/ATLAS-3305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16903621#comment-16903621 ] Adam Rempter commented on ATLAS-3305: - Yes, exactly I was thinking something like this. > Unable to scale atlas kafka consumers > - > > Key: ATLAS-3305 > URL: https://issues.apache.org/jira/browse/ATLAS-3305 > Project: Atlas > Issue Type: Bug > Components: atlas-core, atlas-intg >Affects Versions: 1.1.0, 2.0.0 >Reporter: Adam Rempter >Priority: Major > Labels: performance > > We wanted to scale kafka consumers for atlas, as we are getting many lineage > messages and processing them just with one consumer is not enough. > > There is parameter atlas.notification.hook.numthreads to scale consumers in > NotificationHookConsumer. > But the method: > > notificationInterface.createConsumers(NotificationType.HOOK, numThreads) > > is always returning one element list, which effectively always starts one > consumer > List> consumers = > Collections.singletonList(kafkaConsumer); > > Log incorrectly says that nuber of consumers has been created: > LOG.info("<== KafkaNotification.createConsumers(notificationType={}, > numConsumers={}, autoCommitEnabled={})", notificationType, numConsumers, > autoCommitEnabled) -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (ATLAS-3305) Unable to scale atlas kafka consumers
[ https://issues.apache.org/jira/browse/ATLAS-3305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16903328#comment-16903328 ] Bolke de Bruin commented on ATLAS-3305: --- Why not use partitions with hashed ordering by qualified name? Or is this what you meant [~arempter]? Can we have consensus here [~sarath.ku...@gmail.com] cause this scalability issue is stopping deployment at the moment for us > Unable to scale atlas kafka consumers > - > > Key: ATLAS-3305 > URL: https://issues.apache.org/jira/browse/ATLAS-3305 > Project: Atlas > Issue Type: Bug > Components: atlas-core, atlas-intg >Affects Versions: 1.1.0, 2.0.0 >Reporter: Adam Rempter >Priority: Major > Labels: performance > > We wanted to scale kafka consumers for atlas, as we are getting many lineage > messages and processing them just with one consumer is not enough. > > There is parameter atlas.notification.hook.numthreads to scale consumers in > NotificationHookConsumer. > But the method: > > notificationInterface.createConsumers(NotificationType.HOOK, numThreads) > > is always returning one element list, which effectively always starts one > consumer > List> consumers = > Collections.singletonList(kafkaConsumer); > > Log incorrectly says that nuber of consumers has been created: > LOG.info("<== KafkaNotification.createConsumers(notificationType={}, > numConsumers={}, autoCommitEnabled={})", notificationType, numConsumers, > autoCommitEnabled) -- This message was sent by Atlassian JIRA (v7.6.14#76016)
[jira] [Commented] (ATLAS-3305) Unable to scale atlas kafka consumers
[ https://issues.apache.org/jira/browse/ATLAS-3305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16875965#comment-16875965 ] Adam Rempter commented on ATLAS-3305: - Yes, its true, it just spawns multiple consumers. I guess, Atlas is using Kafka which is distributed message broker, and by definition there is no real way (at least now?) to guarantee global consitency. One way to mitigate it, would be to use message key by producer, so at least there will be order preserved by partition. Key could be either userId (one user per type of service, eg. hive) or entity name. In more strict mode (configuration option?) Atlas consumer could then check if message has key and if not discard such message? > Unable to scale atlas kafka consumers > - > > Key: ATLAS-3305 > URL: https://issues.apache.org/jira/browse/ATLAS-3305 > Project: Atlas > Issue Type: Bug > Components: atlas-core, atlas-intg >Affects Versions: 1.1.0, 2.0.0 >Reporter: Adam Rempter >Priority: Major > Labels: performance > > We wanted to scale kafka consumers for atlas, as we are getting many lineage > messages and processing them just with one consumer is not enough. > > There is parameter atlas.notification.hook.numthreads to scale consumers in > NotificationHookConsumer. > But the method: > > notificationInterface.createConsumers(NotificationType.HOOK, numThreads) > > is always returning one element list, which effectively always starts one > consumer > List> consumers = > Collections.singletonList(kafkaConsumer); > > Log incorrectly says that nuber of consumers has been created: > LOG.info("<== KafkaNotification.createConsumers(notificationType={}, > numConsumers={}, autoCommitEnabled={})", notificationType, numConsumers, > autoCommitEnabled) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (ATLAS-3305) Unable to scale atlas kafka consumers
[ https://issues.apache.org/jira/browse/ATLAS-3305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16875949#comment-16875949 ] Sarath Subramanian commented on ATLAS-3305: --- [~arempter], your solution to spawn multiple threads for NotificationHookConsumer doesn't consider the order of messages coming to the kafka topic - *ATLAS_HOOK*. For e.g. if we have the following messages: * *m1*: CREATE HIVE TABLE 't1' * *m2*: UPDATE HIVE TABLE 't1' * *m3*: UPDATE HIVE TABLE 't1' * *m4*: DROP HIVE TABLE 't1' The update message (*m2*) should not be processed before create message (*m1*) and drop message (*m4*) should not be processed until *m1*, *m2* and *m3* are processed. Do you have a solution which addresses the above case? > Unable to scale atlas kafka consumers > - > > Key: ATLAS-3305 > URL: https://issues.apache.org/jira/browse/ATLAS-3305 > Project: Atlas > Issue Type: Bug > Components: atlas-core, atlas-intg >Affects Versions: 1.1.0, 2.0.0 >Reporter: Adam Rempter >Priority: Major > Labels: performance > > We wanted to scale kafka consumers for atlas, as we are getting many lineage > messages and processing them just with one consumer is not enough. > > There is parameter atlas.notification.hook.numthreads to scale consumers in > NotificationHookConsumer. > But the method: > > notificationInterface.createConsumers(NotificationType.HOOK, numThreads) > > is always returning one element list, which effectively always starts one > consumer > List> consumers = > Collections.singletonList(kafkaConsumer); > > Log incorrectly says that nuber of consumers has been created: > LOG.info("<== KafkaNotification.createConsumers(notificationType={}, > numConsumers={}, autoCommitEnabled={})", notificationType, numConsumers, > autoCommitEnabled) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (ATLAS-3305) Unable to scale atlas kafka consumers
[ https://issues.apache.org/jira/browse/ATLAS-3305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16873114#comment-16873114 ] Adam Rempter commented on ATLAS-3305: - With the change I can see atlas is able to spawn consumers correctly: TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID ATLAS_HOOK 2 - 0 - consumer-2-51e593e8-549b-47b7-a6d0-0f543ffaa306 /127.0.0.1 consumer-2 ATLAS_HOOK 3 - 0 - consumer-2-51e593e8-549b-47b7-a6d0-0f543ffaa306 /127.0.0.1 consumer-2 ATLAS_HOOK 0 - 1 - consumer-1-542ae27b-98be-4ee6-a206-5445950b94ff /127.0.0.1 consumer-1 ATLAS_HOOK 1 - 0 - consumer-1-542ae27b-98be-4ee6-a206-5445950b94ff /127.0.0.1 consumer-1 > Unable to scale atlas kafka consumers > - > > Key: ATLAS-3305 > URL: https://issues.apache.org/jira/browse/ATLAS-3305 > Project: Atlas > Issue Type: Bug > Components: atlas-core, atlas-intg >Affects Versions: 1.1.0, 2.0.0 >Reporter: Adam Rempter >Priority: Major > Labels: performance > > We wanted to scale kafka consumers for atlas, as we are getting many lineage > messages and processing them just with one consumer is not enough. > > There is parameter atlas.notification.hook.numthreads to scale consumers in > NotificationHookConsumer. > But the method: > > notificationInterface.createConsumers(NotificationType.HOOK, numThreads) > > is always returning one element list, which effectively always starts one > consumer > List> consumers = > Collections.singletonList(kafkaConsumer); > > Log incorrectly says that nuber of consumers has been created: > LOG.info("<== KafkaNotification.createConsumers(notificationType={}, > numConsumers={}, autoCommitEnabled={})", notificationType, numConsumers, > autoCommitEnabled) -- This message was sent by Atlassian JIRA (v7.6.3#76005)