[ https://issues.apache.org/jira/browse/NIFI-3378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Pierre Villard resolved NIFI-3378. ---------------------------------- Resolution: Won't Fix PutKafka is deprecated as it is designed for old versions of Kafka. > PutKafka retries flowfiles that are too large instead of routing them to > failure > -------------------------------------------------------------------------------- > > Key: NIFI-3378 > URL: https://issues.apache.org/jira/browse/NIFI-3378 > Project: Apache NiFi > Issue Type: Bug > Components: Core Framework > Affects Versions: 1.0.0 > Reporter: Ryan Persaud > Priority: Major > > When using PutKafka, if the content size exceeds 1048576, an uncaught > exception is thrown by org.apache.nifi.stream.io.util.StreamDemarcator.fill() > (see stack trace below). This results in the offending flowfile being > retried repeatedly instead of being routed to failure. > The exception is thrown because maxRequestSize in PublishingContext is > hardcoded to 1048576 which is the "kafka default." In actuality, I believe > the default should be 1000000 for Kafka 0.8 (see message.max.bytes at > https://kafka.apache.org/08/documentation.html#brokerconfigs), but that's > another issue. If we know that any content larger than maxRequestSize is > always going to cause an exception, would it make sense to check the fileSize > early in PutKafka and avoid many needless function calls, exceptions and > retries? For example, something like: > {code} > @Override > protected boolean rendezvousWithKafka(ProcessContext context, > ProcessSession session) throws ProcessException { > boolean processed = false; > FlowFile flowFile = session.get(); > if (flowFile != null) { > if (flowFile.getSize() > 1048576) { > session.transfer(session.penalize(flowFile), REL_FAILURE); > } > else { > flowFile = this.doRendezvousWithKafka(flowFile, context, > session); > if (!this.isFailedFlowFile(flowFile)) { > session.getProvenanceReporter().send(flowFile, > context.getProperty(SEED_BROKERS).getValue() + "/" > + > context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue()); > session.transfer(flowFile, REL_SUCCESS); > } else { > session.transfer(session.penalize(flowFile), REL_FAILURE); > } > } > processed = true; > } > return processed; > } > {code} > Thoughts? A RouteOnAttribute processor that examines the fileSize attribute > could be used to 'protect' PutKafka, but that seems rather cumbersome. > 2017-01-20 02:48:12,008 ERROR [Timer-Driven Process Thread-6] > o.apache.nifi.processors.kafka.PutKafka > java.lang.IllegalStateException: Maximum allowed data size of 1048576 > exceeded. > at > org.apache.nifi.stream.io.util.StreamDemarcator.fill(StreamDemarcator.java:153) > ~[nifi-utils-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579] > at > org.apache.nifi.stream.io.util.StreamDemarcator.nextToken(StreamDemarcator.java:105) > ~[nifi-utils-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579] > at > org.apache.nifi.processors.kafka.KafkaPublisher.publish(KafkaPublisher.java:126) > ~[nifi-kafka-0-8-processors-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579] > at > org.apache.nifi.processors.kafka.PutKafka$1.process(PutKafka.java:313) > ~[nifi-kafka-0-8-processors-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579] > at > org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1880) > ~[nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579] > at > org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1851) > ~[nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579] > at > org.apache.nifi.processors.kafka.PutKafka.doRendezvousWithKafka(PutKafka.java:309) > ~[nifi-kafka-0-8-processors-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579] > at > org.apache.nifi.processors.kafka.PutKafka.rendezvousWithKafka(PutKafka.java:285) > ~[nifi-kafka-0-8-processors-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579] > at > org.apache.nifi.processors.kafka.AbstractKafkaProcessor.onTrigger(AbstractKafkaProcessor.java:76) > ~[nifi-kafka-0-8-processors-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579] > at > org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1064) > [nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579] > at > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) > [nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579] > at > org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) > [nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579] > at > org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132) > [nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > [na:1.8.0_77] > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > [na:1.8.0_77] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > [na:1.8.0_77] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > [na:1.8.0_77] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > [na:1.8.0_77] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > [na:1.8.0_77] > at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77] -- This message was sent by Atlassian Jira (v8.3.4#803005)