[jira] [Updated] (NIFI-7889) ConsumeMQTT - use offer instead of add
[ https://issues.apache.org/jira/browse/NIFI-7889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Peter Turcsanyi updated NIFI-7889: -- Fix Version/s: 1.13.0 Resolution: Fixed Status: Resolved (was: Patch Available) > ConsumeMQTT - use offer instead of add > -- > > Key: NIFI-7889 > URL: https://issues.apache.org/jira/browse/NIFI-7889 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Pierre Villard >Assignee: Pierre Villard >Priority: Major > Fix For: 1.13.0 > > Time Spent: 50m > Remaining Estimate: 0h > > In ConsumeMQTT, we are filling the internal queue of the processor using this > code: > {code:java} > if (mqttQueue.size() >= maxQueueSize){ > throw new IllegalStateException("The subscriber queue is full, > cannot receive another message until the processor is scheduled to run."); > } else { > mqttQueue.add(new MQTTQueueMessage(topic, message)); > } > {code} > Instead of throwing an exception when the internal queue is full, we could > have a blocking call with {{offer}}() to give some time for the queue to be > drained and then add the message to the queue. If the queue is still full, > we'd throw the exception which would cause data loss in case the QoS is > configured to 0. > Documentation should also be improved around the implications of such > configuration. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (NIFI-7889) ConsumeMQTT - use offer instead of add
[ https://issues.apache.org/jira/browse/NIFI-7889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Pierre Villard updated NIFI-7889: - Assignee: Pierre Villard Status: Patch Available (was: Open) > ConsumeMQTT - use offer instead of add > -- > > Key: NIFI-7889 > URL: https://issues.apache.org/jira/browse/NIFI-7889 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions >Reporter: Pierre Villard >Assignee: Pierre Villard >Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > > In ConsumeMQTT, we are filling the internal queue of the processor using this > code: > {code:java} > if (mqttQueue.size() >= maxQueueSize){ > throw new IllegalStateException("The subscriber queue is full, > cannot receive another message until the processor is scheduled to run."); > } else { > mqttQueue.add(new MQTTQueueMessage(topic, message)); > } > {code} > Instead of throwing an exception when the internal queue is full, we could > have a blocking call with {{offer}}() to give some time for the queue to be > drained and then add the message to the queue. If the queue is still full, > we'd throw the exception which would cause data loss in case the QoS is > configured to 0. > Documentation should also be improved around the implications of such > configuration. -- This message was sent by Atlassian Jira (v8.3.4#803005)