[ 
https://issues.apache.org/jira/browse/NIFI-3378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pierre Villard resolved NIFI-3378.
----------------------------------
    Resolution: Won't Fix

PutKafka is deprecated as it is designed for old versions of Kafka.

> PutKafka retries flowfiles that are too large instead of routing them to 
> failure
> --------------------------------------------------------------------------------
>
>                 Key: NIFI-3378
>                 URL: https://issues.apache.org/jira/browse/NIFI-3378
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Core Framework
>    Affects Versions: 1.0.0
>            Reporter: Ryan Persaud
>            Priority: Major
>
> When using PutKafka, if the content size exceeds 1048576, an uncaught 
> exception is thrown by org.apache.nifi.stream.io.util.StreamDemarcator.fill() 
> (see stack trace below).  This results in the offending flowfile being 
> retried repeatedly instead of being routed to failure.
> The exception is thrown because maxRequestSize in PublishingContext is 
> hardcoded to 1048576 which is the "kafka default."  In actuality, I believe 
> the default should be 1000000 for Kafka 0.8 (see message.max.bytes at 
> https://kafka.apache.org/08/documentation.html#brokerconfigs), but that's 
> another issue. If we know that any content larger than maxRequestSize is 
> always going to cause an exception, would it make sense to check the fileSize 
> early in PutKafka and avoid many needless function calls, exceptions and 
> retries?  For example, something like:
> {code}
>     @Override
>     protected boolean rendezvousWithKafka(ProcessContext context, 
> ProcessSession session) throws ProcessException {
>         boolean processed = false;
>         FlowFile flowFile = session.get();
>         if (flowFile != null) {
>             if (flowFile.getSize() > 1048576) {
>                 session.transfer(session.penalize(flowFile), REL_FAILURE);
>             }
>             else {
>                 flowFile = this.doRendezvousWithKafka(flowFile, context, 
> session);
>                 if (!this.isFailedFlowFile(flowFile)) {
>                     session.getProvenanceReporter().send(flowFile,
>                             context.getProperty(SEED_BROKERS).getValue() + "/"
>                                     + 
> context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue());
>                     session.transfer(flowFile, REL_SUCCESS);
>                 } else {
>                     session.transfer(session.penalize(flowFile), REL_FAILURE);
>                 }
>             }
>             processed = true;
>         }
>         return processed;
>     }
> {code}
> Thoughts? A RouteOnAttribute processor that examines the fileSize attribute 
> could be used to 'protect' PutKafka, but that seems rather cumbersome.
> 2017-01-20 02:48:12,008 ERROR [Timer-Driven Process Thread-6] 
> o.apache.nifi.processors.kafka.PutKafka 
> java.lang.IllegalStateException: Maximum allowed data size of 1048576 
> exceeded.
>         at 
> org.apache.nifi.stream.io.util.StreamDemarcator.fill(StreamDemarcator.java:153)
>  ~[nifi-utils-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
>         at 
> org.apache.nifi.stream.io.util.StreamDemarcator.nextToken(StreamDemarcator.java:105)
>  ~[nifi-utils-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
>         at 
> org.apache.nifi.processors.kafka.KafkaPublisher.publish(KafkaPublisher.java:126)
>  ~[nifi-kafka-0-8-processors-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
>         at 
> org.apache.nifi.processors.kafka.PutKafka$1.process(PutKafka.java:313) 
> ~[nifi-kafka-0-8-processors-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
>         at 
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1880)
>  ~[nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
>         at 
> org.apache.nifi.controller.repository.StandardProcessSession.read(StandardProcessSession.java:1851)
>  ~[nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
>         at 
> org.apache.nifi.processors.kafka.PutKafka.doRendezvousWithKafka(PutKafka.java:309)
>  ~[nifi-kafka-0-8-processors-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
>         at 
> org.apache.nifi.processors.kafka.PutKafka.rendezvousWithKafka(PutKafka.java:285)
>  ~[nifi-kafka-0-8-processors-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
>         at 
> org.apache.nifi.processors.kafka.AbstractKafkaProcessor.onTrigger(AbstractKafkaProcessor.java:76)
>  ~[nifi-kafka-0-8-processors-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
>         at 
> org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1064)
>  [nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
>         at 
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
>  [nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
>         at 
> org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
>  [nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
>         at 
> org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:132)
>  [nifi-framework-core-1.0.0.2.0.0.0-579.jar:1.0.0.2.0.0.0-579]
>         at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> [na:1.8.0_77]
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
> [na:1.8.0_77]
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  [na:1.8.0_77]
>         at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  [na:1.8.0_77]
>         at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  [na:1.8.0_77]
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  [na:1.8.0_77]
>         at java.lang.Thread.run(Thread.java:745) [na:1.8.0_77]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to