hello

any idea?


‫בתאריך יום ב׳, 24 ביולי 2023 ב-11:46 מאת ‪nick toker‬‏ <‪
nick.toker....@gmail.com‬‏>:‬

> Hi
>
> 1. we use flink 1.17.1
> 2. there is no traffic and data in the topics when it's happened
> 3
>
> log part with the error:
>
> 2023-07-24 08:41:30,334 DEBUG org.apache.flink.connector.kafka.sink.
> FlinkKafkaInternalProducer [] - commitTransaction telephony-decoder-
> ORCHESTRATOR-MULTIMEDIA-IN-8-10
> 2023-07-24 08:41:30,378 ERROR org.apache.flink.connector.kafka.sink.
> KafkaCommitter [] - Unable to commit transaction
> (org.apache.flink.streaming.runtime.operators.sink.committables.
> CommitRequestImpl@24e0e43c) because it's in an invalid state. Most likely
> the transaction has been aborted for some reason. Please check the Kafka
> logs for more details.
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer
> attempted a transactional operation in an invalid state.
> 2023-07-24 08:41:30,378 ERROR org.apache.flink.connector.kafka.sink.
> KafkaCommitter [] - Unable to commit transaction
> (org.apache.flink.streaming.runtime.operators.sink.committables.
> CommitRequestImpl@55c55f14) because it's in an invalid state. Most likely
> the transaction has been aborted for some reason. Please check the Kafka
> logs for more details.
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer
> attempted a transactional operation in an invalid state.
> 2023-07-24 08:41:30,380 DEBUG org.apache.flink.runtime.state.
> TaskStateManagerImpl [] - Operator b2a2e541ae5a67fa81a2d8b387225849 has
> remote state SubtaskState{operatorStateFromBackend=StateObjectCollection{[
> OperatorStateHandle{stateNameToPartitionOffsets={writer_raw_states=
> StateMetaInfo{offsets=[233], distributionMode=SPLIT_DISTRIBUTE}},
> delegateStateHandle=ByteStreamStateHandle{handleName=
> 'file:/opt/flink/shared/savepoints/savepoint-000000-fcc29062a521/238f2a1e-aa7e-4218-843a-b469a5dcbad8',
> dataBytes=291}}]}, operatorStateFromStream=StateObjectCollection{[]},
> keyedStateFromBackend=StateObjectCollection{[]}, keyedStateFromStream=
> StateObjectCollection{[]}, inputChannelState=StateObjectCollection{[]},
> resultSubpartitionState=StateObjectCollection{[]}, stateSize=291,
> checkpointedSize=291} from job manager and local state alternatives []
> from local state store org.apache.flink.runtime.state.
> NoOpTaskLocalStateStoreImpl@42b00150.
> 2023-07-24 08:41:30,380 DEBUG org.apache.flink.runtime.state.
> TaskStateManagerImpl [] - Operator b2a2e541ae5a67fa81a2d8b387225849 has
> remote state SubtaskState{operatorStateFromBackend=StateObjectCollection{[
> OperatorStateHandle{stateNameToPartitionOffsets={writer_raw_states=
> StateMetaInfo{offsets=[233], distributionMode=SPLIT_DISTRIBUTE}},
> delegateStateHandle=ByteStreamStateHandle{handleName=
> 'file:/opt/flink/shared/savepoints/savepoint-000000-fcc29062a521/e2d3916f-f9f9-40e4-89bc-bc193fbd5373',
> dataBytes=291}}]}, operatorStateFromStream=StateObjectCollection{[]},
> keyedStateFromBackend=StateObjectCollection{[]}, keyedStateFromStream=
> StateObjectCollection{[]}, inputChannelState=StateObjectCollection{[]},
> resultSubpartitionState=StateObjectCollection{[]}, stateSize=291,
> checkpointedSize=291} from job manager and local state alternatives []
> from local state store org.apache.flink.runtime.state.
> NoOpTaskLocalStateStoreImpl@32734da0.
> 2023-07-24 08:41:30,380 DEBUG org.apache.flink.streaming.api.operators.
> BackendRestorerProcedure [] - Creating operator state backend for
> SinkWriterOperator_b2a2e541ae5a67fa81a2d8b387225849_(12/20) and restoring
> with state from alternative (1/1).
> 2023-07-24 08:41:30,381 DEBUG org.apache.flink.streaming.api.operators.
> BackendRestorerProcedure [] - Creating operator state backend for
> SinkWriterOperator_b2a2e541ae5a67fa81a2d8b387225849_(14/20) and restoring
> with state from alternative (1/1).
> 2023-07-24 08:41:30,381 ERROR org.apache.flink.connector.kafka.sink.
> KafkaCommitter [] - Unable to commit transaction
> (org.apache.flink.streaming.runtime.operators.sink.committables.
> CommitRequestImpl@75581079) because it's in an invalid state. Most likely
> the transaction has been aborted for some reason. Please check the Kafka
> logs for more details.
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer
> attempted a transactional operation in an invalid state.
> 2023-07-24 08:41:30,382 DEBUG org.apache.flink.runtime.state.
> TaskStateManagerImpl [] - Operator b2a2e541ae5a67fa81a2d8b387225849 has
> remote state SubtaskState{operatorStateFromBackend=StateObjectCollection{[
> OperatorStateHandle{stateNameToPartitionOffsets={writer_raw_states=
> StateMetaInfo{offsets=[233], distributionMode=SPLIT_DISTRIBUTE}},
> delegateStateHandle=ByteStreamStateHandle{handleName=
> 'file:/opt/flink/shared/savepoints/savepoint-000000-fcc29062a521/66a3e58e-0f8c-4c09-9fc3-04be0d3388dc',
> dataBytes=291}}]}, operatorStateFromStream=StateObjectCollection{[]},
> keyedStateFromBackend=StateObjectCollection{[]}, keyedStateFromStream=
> StateObjectCollection{[]}, inputChannelState=StateObjectCollection{[]},
> resultSubpartitionState=StateObjectCollection{[]}, stateSize=291,
> checkpointedSize=291} from job manager and local state alternatives []
> from local state store org.apache.flink.runtime.state.
> NoOpTaskLocalStateStoreImpl@5c241b41.
> 2023-07-24 08:41:30,382 DEBUG org.apache.flink.streaming.api.operators.
> BackendRestorerProcedure [] - Creating operator state backend for
> SinkWriterOperator_b2a2e541ae5a67fa81a2d8b387225849_(5/20) and restoring
> with state from alternative (1/1).
> 2023-07-24 08:41:30,382 DEBUG org.apache.flink.runtime.metrics.
> MetricRegistryImpl [] - Registering metric
> taskmanager.job.task.operator.numRecordsOutErrors.
> 2023-07-24 08:41:30,382 DEBUG org.apache.flink.runtime.metrics.
> MetricRegistryImpl [] - Registering metric
> taskmanager.job.task.operator.numRecordsOutErrors.
> 2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics.
> MetricRegistryImpl [] - Registering metric
> taskmanager.job.task.operator.numRecordsSendErrors.
> 2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics.
> MetricRegistryImpl [] - Registering metric
> taskmanager.job.task.operator.numRecordsOutErrors.
> 2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics.
> MetricRegistryImpl [] - Registering metric
> taskmanager.job.task.operator.numRecordsSendErrors.
> 2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics.
> MetricRegistryImpl [] - Registering metric
> taskmanager.job.task.operator.numRecordsSend.
> 2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics.
> MetricRegistryImpl [] - Registering metric
> taskmanager.job.task.operator.numRecordsSend.
> 2023-07-24 08:41:30,384 DEBUG org.apache.flink.runtime.metrics.
> MetricRegistryImpl [] - Registering metric
> taskmanager.job.task.operator.numBytesSend.
> 2023-07-24 08:41:30,384 DEBUG org.apache.flink.runtime.metrics.
> MetricRegistryImpl [] - Registering metric
> taskmanager.job.task.operator.numRecordsSendErrors.
> 2023-07-24 08:41:30,384 DEBUG org.apache.flink.runtime.metrics.
> MetricRegistryImpl [] - Registering metric
> taskmanager.job.task.operator.numRecordsSend.
> 2023-07-24 08:41:30,384 DEBUG org.apache.flink.runtime.metrics.
> MetricRegistryImpl [] - Registering metric
> taskmanager.job.task.operator.numBytesSend.
> 2023-07-24 08:41:30,385 ERROR org.apache.flink.connector.kafka.sink.
> KafkaCommitter [] - Unable to commit transaction
> (org.apache.flink.streaming.runtime.operators.sink.committables.
> CommitRequestImpl@47835f95) because it's in an invalid state. Most likely
> the transaction has been aborted for some reason. Please check the Kafka
> logs for more details.
> org.apache.kafka.common.errors.InvalidTxnStateException: The producer
> attempted a transactional operation in an invalid state.
> 2023-07-24 08:41:30,385 DEBUG org.apache.flink.runtime.metrics.
> MetricRegistryImpl [] - Registering metric
> taskmanager.job.task.operator.numBytesSend.
> 2023-07-24 08:41:30,385 DEBUG org.apache.flink.runtime.state.
> TaskStateManagerImpl [] - Operator b2a2e541ae5a67fa81a2d8b387225849 has
> remote state SubtaskState{operatorStateFromBackend=StateObjectCollection{[
> OperatorStateHandle{stateNameToPartitionOffsets={writer_raw_states=
> StateMetaInfo{offsets=[233], distributionMode=SPLIT_DISTRIBUTE}},
> delegateStateHandle=ByteStreamStateHandle{handleName=
> 'file:/opt/flink/shared/savepoints/savepoint-000000-fcc29062a521/e4aa5b34-3b1f-4f6b-b9a0-6dacb4d5408b',
> dataBytes=291}}]}, operatorStateFromStream=StateObjectCollection{[]},
> keyedStateFromBackend=StateObjectCollection{[]}, keyedStateFromStream=
> StateObjectCollection{[]}, inputChannelState=StateObjectCollection{[]},
> resultSubpartitionState=StateObjectCollection{[]}, stateSize=291,
> checkpointedSize=291} from job manager and local state alternatives []
> from local state store org.apache.flink.runtime.state.
> NoOpTaskLocalStateStoreImpl@157a2949.
>
>
> ‫בתאריך יום ב׳, 24 ביולי 2023 ב-5:31 מאת ‪Shammon FY‬‏ <‪zjur...@gmail.com
> ‬‏>:‬
>
>> Hi nick,
>>
>> Is there any error log? That may help to analyze the root cause.
>>
>> On Sun, Jul 23, 2023 at 9:53 PM nick toker <nick.toker....@gmail.com>
>> wrote:
>>
>>> hello
>>>
>>>
>>> we replaced deprecated kafka producer with kafka sink
>>> and from time to time when we submit a job he stack for 5 min in
>>> inisazaing ( on sink operators)
>>> we verify the the transaction prefix is unique
>>>
>>> it's not happened when we use kafka producer
>>>
>>> What can be the reason?
>>>
>>>

Reply via email to