hello any idea?
בתאריך יום ב׳, 24 ביולי 2023 ב-11:46 מאת nick toker < nick.toker....@gmail.com>: > Hi > > 1. we use flink 1.17.1 > 2. there is no traffic and data in the topics when it's happened > 3 > > log part with the error: > > 2023-07-24 08:41:30,334 DEBUG org.apache.flink.connector.kafka.sink. > FlinkKafkaInternalProducer [] - commitTransaction telephony-decoder- > ORCHESTRATOR-MULTIMEDIA-IN-8-10 > 2023-07-24 08:41:30,378 ERROR org.apache.flink.connector.kafka.sink. > KafkaCommitter [] - Unable to commit transaction > (org.apache.flink.streaming.runtime.operators.sink.committables. > CommitRequestImpl@24e0e43c) because it's in an invalid state. Most likely > the transaction has been aborted for some reason. Please check the Kafka > logs for more details. > org.apache.kafka.common.errors.InvalidTxnStateException: The producer > attempted a transactional operation in an invalid state. > 2023-07-24 08:41:30,378 ERROR org.apache.flink.connector.kafka.sink. > KafkaCommitter [] - Unable to commit transaction > (org.apache.flink.streaming.runtime.operators.sink.committables. > CommitRequestImpl@55c55f14) because it's in an invalid state. Most likely > the transaction has been aborted for some reason. Please check the Kafka > logs for more details. > org.apache.kafka.common.errors.InvalidTxnStateException: The producer > attempted a transactional operation in an invalid state. > 2023-07-24 08:41:30,380 DEBUG org.apache.flink.runtime.state. > TaskStateManagerImpl [] - Operator b2a2e541ae5a67fa81a2d8b387225849 has > remote state SubtaskState{operatorStateFromBackend=StateObjectCollection{[ > OperatorStateHandle{stateNameToPartitionOffsets={writer_raw_states= > StateMetaInfo{offsets=[233], distributionMode=SPLIT_DISTRIBUTE}}, > delegateStateHandle=ByteStreamStateHandle{handleName= > 'file:/opt/flink/shared/savepoints/savepoint-000000-fcc29062a521/238f2a1e-aa7e-4218-843a-b469a5dcbad8', > dataBytes=291}}]}, operatorStateFromStream=StateObjectCollection{[]}, > keyedStateFromBackend=StateObjectCollection{[]}, keyedStateFromStream= > StateObjectCollection{[]}, inputChannelState=StateObjectCollection{[]}, > resultSubpartitionState=StateObjectCollection{[]}, stateSize=291, > checkpointedSize=291} from job manager and local state alternatives [] > from local state store org.apache.flink.runtime.state. > NoOpTaskLocalStateStoreImpl@42b00150. > 2023-07-24 08:41:30,380 DEBUG org.apache.flink.runtime.state. > TaskStateManagerImpl [] - Operator b2a2e541ae5a67fa81a2d8b387225849 has > remote state SubtaskState{operatorStateFromBackend=StateObjectCollection{[ > OperatorStateHandle{stateNameToPartitionOffsets={writer_raw_states= > StateMetaInfo{offsets=[233], distributionMode=SPLIT_DISTRIBUTE}}, > delegateStateHandle=ByteStreamStateHandle{handleName= > 'file:/opt/flink/shared/savepoints/savepoint-000000-fcc29062a521/e2d3916f-f9f9-40e4-89bc-bc193fbd5373', > dataBytes=291}}]}, operatorStateFromStream=StateObjectCollection{[]}, > keyedStateFromBackend=StateObjectCollection{[]}, keyedStateFromStream= > StateObjectCollection{[]}, inputChannelState=StateObjectCollection{[]}, > resultSubpartitionState=StateObjectCollection{[]}, stateSize=291, > checkpointedSize=291} from job manager and local state alternatives [] > from local state store org.apache.flink.runtime.state. > NoOpTaskLocalStateStoreImpl@32734da0. > 2023-07-24 08:41:30,380 DEBUG org.apache.flink.streaming.api.operators. > BackendRestorerProcedure [] - Creating operator state backend for > SinkWriterOperator_b2a2e541ae5a67fa81a2d8b387225849_(12/20) and restoring > with state from alternative (1/1). > 2023-07-24 08:41:30,381 DEBUG org.apache.flink.streaming.api.operators. > BackendRestorerProcedure [] - Creating operator state backend for > SinkWriterOperator_b2a2e541ae5a67fa81a2d8b387225849_(14/20) and restoring > with state from alternative (1/1). > 2023-07-24 08:41:30,381 ERROR org.apache.flink.connector.kafka.sink. > KafkaCommitter [] - Unable to commit transaction > (org.apache.flink.streaming.runtime.operators.sink.committables. > CommitRequestImpl@75581079) because it's in an invalid state. Most likely > the transaction has been aborted for some reason. Please check the Kafka > logs for more details. > org.apache.kafka.common.errors.InvalidTxnStateException: The producer > attempted a transactional operation in an invalid state. > 2023-07-24 08:41:30,382 DEBUG org.apache.flink.runtime.state. > TaskStateManagerImpl [] - Operator b2a2e541ae5a67fa81a2d8b387225849 has > remote state SubtaskState{operatorStateFromBackend=StateObjectCollection{[ > OperatorStateHandle{stateNameToPartitionOffsets={writer_raw_states= > StateMetaInfo{offsets=[233], distributionMode=SPLIT_DISTRIBUTE}}, > delegateStateHandle=ByteStreamStateHandle{handleName= > 'file:/opt/flink/shared/savepoints/savepoint-000000-fcc29062a521/66a3e58e-0f8c-4c09-9fc3-04be0d3388dc', > dataBytes=291}}]}, operatorStateFromStream=StateObjectCollection{[]}, > keyedStateFromBackend=StateObjectCollection{[]}, keyedStateFromStream= > StateObjectCollection{[]}, inputChannelState=StateObjectCollection{[]}, > resultSubpartitionState=StateObjectCollection{[]}, stateSize=291, > checkpointedSize=291} from job manager and local state alternatives [] > from local state store org.apache.flink.runtime.state. > NoOpTaskLocalStateStoreImpl@5c241b41. > 2023-07-24 08:41:30,382 DEBUG org.apache.flink.streaming.api.operators. > BackendRestorerProcedure [] - Creating operator state backend for > SinkWriterOperator_b2a2e541ae5a67fa81a2d8b387225849_(5/20) and restoring > with state from alternative (1/1). > 2023-07-24 08:41:30,382 DEBUG org.apache.flink.runtime.metrics. > MetricRegistryImpl [] - Registering metric > taskmanager.job.task.operator.numRecordsOutErrors. > 2023-07-24 08:41:30,382 DEBUG org.apache.flink.runtime.metrics. > MetricRegistryImpl [] - Registering metric > taskmanager.job.task.operator.numRecordsOutErrors. > 2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics. > MetricRegistryImpl [] - Registering metric > taskmanager.job.task.operator.numRecordsSendErrors. > 2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics. > MetricRegistryImpl [] - Registering metric > taskmanager.job.task.operator.numRecordsOutErrors. > 2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics. > MetricRegistryImpl [] - Registering metric > taskmanager.job.task.operator.numRecordsSendErrors. > 2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics. > MetricRegistryImpl [] - Registering metric > taskmanager.job.task.operator.numRecordsSend. > 2023-07-24 08:41:30,383 DEBUG org.apache.flink.runtime.metrics. > MetricRegistryImpl [] - Registering metric > taskmanager.job.task.operator.numRecordsSend. > 2023-07-24 08:41:30,384 DEBUG org.apache.flink.runtime.metrics. > MetricRegistryImpl [] - Registering metric > taskmanager.job.task.operator.numBytesSend. > 2023-07-24 08:41:30,384 DEBUG org.apache.flink.runtime.metrics. > MetricRegistryImpl [] - Registering metric > taskmanager.job.task.operator.numRecordsSendErrors. > 2023-07-24 08:41:30,384 DEBUG org.apache.flink.runtime.metrics. > MetricRegistryImpl [] - Registering metric > taskmanager.job.task.operator.numRecordsSend. > 2023-07-24 08:41:30,384 DEBUG org.apache.flink.runtime.metrics. > MetricRegistryImpl [] - Registering metric > taskmanager.job.task.operator.numBytesSend. > 2023-07-24 08:41:30,385 ERROR org.apache.flink.connector.kafka.sink. > KafkaCommitter [] - Unable to commit transaction > (org.apache.flink.streaming.runtime.operators.sink.committables. > CommitRequestImpl@47835f95) because it's in an invalid state. Most likely > the transaction has been aborted for some reason. Please check the Kafka > logs for more details. > org.apache.kafka.common.errors.InvalidTxnStateException: The producer > attempted a transactional operation in an invalid state. > 2023-07-24 08:41:30,385 DEBUG org.apache.flink.runtime.metrics. > MetricRegistryImpl [] - Registering metric > taskmanager.job.task.operator.numBytesSend. > 2023-07-24 08:41:30,385 DEBUG org.apache.flink.runtime.state. > TaskStateManagerImpl [] - Operator b2a2e541ae5a67fa81a2d8b387225849 has > remote state SubtaskState{operatorStateFromBackend=StateObjectCollection{[ > OperatorStateHandle{stateNameToPartitionOffsets={writer_raw_states= > StateMetaInfo{offsets=[233], distributionMode=SPLIT_DISTRIBUTE}}, > delegateStateHandle=ByteStreamStateHandle{handleName= > 'file:/opt/flink/shared/savepoints/savepoint-000000-fcc29062a521/e4aa5b34-3b1f-4f6b-b9a0-6dacb4d5408b', > dataBytes=291}}]}, operatorStateFromStream=StateObjectCollection{[]}, > keyedStateFromBackend=StateObjectCollection{[]}, keyedStateFromStream= > StateObjectCollection{[]}, inputChannelState=StateObjectCollection{[]}, > resultSubpartitionState=StateObjectCollection{[]}, stateSize=291, > checkpointedSize=291} from job manager and local state alternatives [] > from local state store org.apache.flink.runtime.state. > NoOpTaskLocalStateStoreImpl@157a2949. > > > בתאריך יום ב׳, 24 ביולי 2023 ב-5:31 מאת Shammon FY <zjur...@gmail.com > >: > >> Hi nick, >> >> Is there any error log? That may help to analyze the root cause. >> >> On Sun, Jul 23, 2023 at 9:53 PM nick toker <nick.toker....@gmail.com> >> wrote: >> >>> hello >>> >>> >>> we replaced deprecated kafka producer with kafka sink >>> and from time to time when we submit a job he stack for 5 min in >>> inisazaing ( on sink operators) >>> we verify the the transaction prefix is unique >>> >>> it's not happened when we use kafka producer >>> >>> What can be the reason? >>> >>>