Yes. That looks promising to me. Feel free to open an PR after we have a
JIRA -- or just create the JIRA right away.

-Matthias

On 4/4/18 2:57 PM, Ted Yu wrote:
> How about the following change ?
> 
> diff --git
> a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
> b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
> index 8d6e56a..92bedad 100644
> ---
> a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
> +++
> b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
> @@ -482,7 +482,7 @@ public class StreamTask extends AbstractTask implements
> ProcessorNodePunctuator
>              if (eosEnabled) {
>                  if (!clean) {
>                      try {
> -                        if (!isZombie) {
> +                        if (!isZombie && transactionInFlight) {
>                              producer.abortTransaction();
>                          }
>                          transactionInFlight = false;
> 
> On Wed, Apr 4, 2018 at 2:02 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> Thanks for reporting this.
>>
>> It's indeed a bug in Kafka Streams. It's related to this fix:
>> https://issues.apache.org/jira/browse/KAFKA-6634 -- the corresponding PR
>> introduces the issue.
>>
>> Because, we initialize TX delayed, for your case, we never initialize TX
>> and thus aborting the TX fails.
>>
>> Please open a JIRA for the issue.
>>
>> -Matthias
>>
>> On 4/4/18 9:32 AM, Ted Yu wrote:
>>> Looking at isTransitionValid():
>>>
>>>                 case ABORTING_TRANSACTION:
>>>
>>>                     return source == IN_TRANSACTION || source ==
>>> ABORTABLE_ERROR;
>>>
>>> The source state is not supposed to be READY.
>>>
>>> I don't see READY in the log you posted.
>>>
>>>
>>> Please consider logging a JIRA where you can attach logs.
>>>
>>>
>>> Cheers
>>>
>>>
>>> On Wed, Apr 4, 2018 at 2:49 AM, Frederic Arno <frederica...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> I running tests against kafka-streams 1.1 and get the following stack
>>>> trace (everything was working alright using kafka-streams 1.0):
>>>>
>>>> ERROR org.apache.kafka.streams.processor.internals.AssignedStreamsTasks
>> -
>>>> stream-thread [feedBuilder-XXX-StreamThread-4] Failed to close stream
>>>> task, 0_2
>>>> org.apache.kafka.common.KafkaException: TransactionalId
>> feedBuilder-0_2:
>>>> Invalid transition attempted from state READY to state
>> ABORTING_TRANSACTION
>>>>         at org.apache.kafka.clients.producer.internals.TransactionManag
>>>> er.transitionTo(TransactionManager.java:757)
>>>>         at org.apache.kafka.clients.producer.internals.TransactionManag
>>>> er.transitionTo(TransactionManager.java:751)
>>>>         at org.apache.kafka.clients.producer.internals.TransactionManag
>>>> er.beginAbort(TransactionManager.java:230)
>>>>         at org.apache.kafka.clients.producer.KafkaProducer.abortTransac
>>>> tion(KafkaProducer.java:660)
>>>>         at org.apache.kafka.streams.processor.internals.StreamTask.
>>>> closeSuspended(StreamTask.java:486)
>>>>         at org.apache.kafka.streams.processor.internals.StreamTask.
>>>> close(StreamTask.java:546)
>>>>         at org.apache.kafka.streams.processor.internals.AssignedTasks.c
>>>> loseNonRunningTasks(AssignedTasks.java:166)
>>>>         at org.apache.kafka.streams.processor.internals.AssignedTasks.
>>>> suspend(AssignedTasks.java:151)
>>>>         at org.apache.kafka.streams.processor.internals.TaskManager.sus
>>>> pendTasksAndState(TaskManager.java:242)
>>>>         at org.apache.kafka.streams.processor.internals.StreamThread$Re
>>>> balanceListener.onPartitionsRevoked(StreamThread.java:291)
>>>>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>>>> tor.onJoinPrepare(ConsumerCoordinator.java:414)
>>>>         at org.apache.kafka.clients.consumer.internals.AbstractCoordina
>>>> tor.joinGroupIfNeeded(AbstractCoordinator.java:359)
>>>>         at org.apache.kafka.clients.consumer.internals.AbstractCoordina
>>>> tor.ensureActiveGroup(AbstractCoordinator.java:316)
>>>>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>>>> tor.poll(ConsumerCoordinator.java:290)
>>>>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
>>>> KafkaConsumer.java:1149)
>>>>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaCo
>>>> nsumer.java:1115)
>>>>         at org.apache.kafka.streams.processor.internals.StreamThread.
>>>> pollRequests(StreamThread.java:827)
>>>>         at org.apache.kafka.streams.processor.internals.StreamThread.
>>>> runOnce(StreamThread.java:784)
>>>>         at org.apache.kafka.streams.processor.internals.StreamThread.
>>>> runLoop(StreamThread.java:750)
>>>>         at org.apache.kafka.streams.processor.internals.StreamThread.
>>>> run(StreamThread.java:720)
>>>>
>>>>
>>>> This happens when starting the same stream-processing application on 3
>>>> JVMs all running on the same linux box, JVMs are named JVM-[2-4]. All 3
>>>> instances use separate stream state.dir. No record is ever processed
>>>> because the input kafka topics are empty at this stage.
>>>>
>>>> JVM-2 starts first, joined shortly after by JVM-4 and JVM-3, find the
>>>> state transition logs below. The above stacktrace is from JVM-4
>>>>
>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING to
>>>> REBALANCING
>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from
>> REBALANCING
>>>> to RUNNING
>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from RUNNING to
>>>> REBALANCING
>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING to
>>>> REBALANCING
>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from
>> REBALANCING
>>>> to RUNNING
>>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from RUNNING to
>>>> REBALANCING
>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING to
>>>> REBALANCING
>>>> JVM-4 crashes here with above stacktrace
>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from
>> REBALANCING
>>>> to RUNNING
>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from
>> REBALANCING
>>>> to ERROR
>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from ERROR to
>>>> PENDING_SHUTDOWN
>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from
>>>> PENDING_SHUTDOWN to NOT_RUNNING
>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from RUNNING to
>>>> REBALANCING
>>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from
>> REBALANCING
>>>> to RUNNING
>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING to
>>>> REBALANCING
>>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from RUNNING to
>>>> REBALANCING
>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from
>> REBALANCING
>>>> to RUNNING
>>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from
>> REBALANCING
>>>> to RUNNING
>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from
>> REBALANCING
>>>> to RUNNING
>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from RUNNING to
>>>> PENDING_SHUTDOWN
>>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from RUNNING to
>>>> REBALANCING
>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from RUNNING to
>>>> REBALANCING
>>>> [JVM-2] stream-client [feedBuilder-XXX] State transition from
>>>> PENDING_SHUTDOWN to NOT_RUNNING
>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from
>> REBALANCING
>>>> to RUNNING
>>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from
>> REBALANCING
>>>> to RUNNING
>>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from RUNNING to
>>>> PENDING_SHUTDOWN
>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from RUNNING to
>>>> PENDING_SHUTDOWN
>>>> [JVM-3] stream-client [feedBuilder-XXX] State transition from
>>>> PENDING_SHUTDOWN to NOT_RUNNING
>>>> [JVM-4] stream-client [feedBuilder-XXX] State transition from
>>>> PENDING_SHUTDOWN to NOT_RUNNING
>>>>
>>>>
>>>> What should I do with that?
>>>>
>>>> Thanks, Fred
>>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to