The only form of rate limiting I have set is *maxOffsetsPerTrigger *and
*fetch.message.max.bytes. *

*"*may be that you are trying to process records that have passed the
retention period within Kafka.*"*
If the above is true then I should have my offsets reset only once ideally
when my application starts. But mu offsets are resetting for every batch.
if my application is using offsets that are no longer available in Kafka it
will reset to earliest or latest offset available in Kafka and the
next request made to Kafka should provide proper data. But in case for all
micro-batches the offsets are getting reseted and the batch is producing no
data.



On Wed, Sep 11, 2019 at 5:12 PM Burak Yavuz <brk...@gmail.com> wrote:

> Do you have rate limiting set on your stream? It may be that you are
> trying to process records that have passed the retention period within
> Kafka.
>
> On Wed, Sep 11, 2019 at 2:39 PM Charles vinodh <mig.flan...@gmail.com>
> wrote:
>
>>
>> Hi,
>>
>> I am trying to run a spark application ingesting data from Kafka using
>> the Spark structured streaming and the spark library
>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1. I am facing a very weird
>> issue where during execution of all my micro-batches the Kafka consumer is
>> not able to fetch the offsets and its having its offsets reset as show
>> below in this log
>>
>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>  Resetting offset for partition my-topic-5 to offset 1168959116.
>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>  Resetting offset for partition my-topic-1 to offset 1218619371.
>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>  Resetting offset for partition my-topic-8 to offset 1157205346.
>> 19/09/11 02:49:42 INFO Fetcher: [Consumer clientId=my_client_id, 
>> groupId=spark-kafka-source-5496988b-3f5c-4342-9361-917e4f3ece51-1340785812-driver-0]
>>  Resetting offset for partition my-topic-21 to offset 1255403059.
>>
>>
>> It is reasonable if this resetting happens once in application due to the
>> fact that the offsets stored in my checkpoint are no longer valid and will
>> have to reset our offsets to a new value. But I am seeing this reset
>> happening for every micro batch execution in my streaming job. In at the
>> end the streaming query progress emits the following
>>
>> 19/09/10 15:55:01 INFO MicroBatchExecution: Streaming query made progress: {
>>   "id" : "90f21e5f-270d-428d-b068-1f1aa0861fb1",
>>   "runId" : "f09f8eb4-8f33-42c2-bdf4-dffeaebf630e",
>>   "name" : null,
>>   "timestamp" : "2019-09-10T15:55:00.000Z",
>>   "batchId" : 189,
>>   "numInputRows" : 0,
>>   "inputRowsPerSecond" : 0.0,
>>   "processedRowsPerSecond" : 0.0,
>>   "durationMs" : {
>>     "addBatch" : 127,
>>     "getBatch" : 0,
>>     "getEndOffset" : 0,
>>     "queryPlanning" : 24,
>>     "setOffsetRange" : 36,
>>     "triggerExecution" : 1859,
>>     "walCommit" : 1032
>>   },
>>   "stateOperators" : [ ],
>>   "sources" : [ {
>>     "description" : "KafkaV2[Subscribe[my_kafka_topic]]",
>>     "startOffset" : {
>>       "my_kafka_topic" : {
>>         "23" : 1206926686,
>>         "8" : 1158514946,
>>         "17" : 1258387219,
>>         "11" : 1263091642,
>>         "2" : 1226741128,
>>         "20" : 1229560889,
>>         "5" : 1170304913,
>>         "14" : 1207333901,
>>         "4" : 1274242728,
>>         "13" : 1336386658,
>>         "22" : 1260210993,
>>         "7" : 1288639296,
>>         "16" : 1247462229,
>>         "10" : 1093157103,
>>         "1" : 1219904858,
>>         "19" : 1116269615,
>>         "9" : 1238935018,
>>         "18" : 1069224544,
>>         "12" : 1256018541,
>>         "3" : 1251150202,
>>         "21" : 1256774117,
>>         "15" : 1170591375,
>>         "6" : 1185108169,
>>         "24" : 1202342095,
>>         "0" : 1165356330
>>       }
>>     },
>>     "endOffset" : {
>>       "my_kafka_topic" : {
>>         "23" : 1206928043,
>>         "8" : 1158516721,
>>         "17" : 1258389219,
>>         "11" : 1263093490,
>>         "2" : 1226743225,
>>         "20" : 1229562962,
>>         "5" : 1170307882,
>>         "14" : 1207335736,
>>         "4" : 1274245585,
>>         "13" : 1336388570,
>>         "22" : 1260213582,
>>         "7" : 1288641384,
>>         "16" : 1247464311,
>>         "10" : 1093159186,
>>         "1" : 1219906407,
>>         "19" : 1116271435,
>>         "9" : 1238936994,
>>         "18" : 1069226913,
>>         "12" : 1256020926,
>>         "3" : 1251152579,
>>         "21" : 1256776910,
>>         "15" : 1170593216,
>>         "6" : 1185110032,
>>         "24" : 1202344538,
>>         "0" : 1165358262
>>       }
>>     },
>>     "numInputRows" : 0,
>>     "inputRowsPerSecond" : 0.0,
>>     "processedRowsPerSecond" : 0.0
>>   } ],
>>   "sink" : {
>>     "description" : "FileSink[s3://my-s3-bucket/data/kafka/my_kafka_topic]"
>>   }
>> }
>>
>>
>> In the above StreamingQueryProgress event the numInputRows fields  is
>> zero and this is the case for all micro batch executions and no data is
>> being produced whatsoever. So basically for each batch my offsets are being
>> reset and each batch is producing zero rows. Since there is no work being
>> done and since dynamic allocation is enabled all my executors killed... I
>> have tried deleting my checkpoint and started my application from scratch
>> and I am still facing the same issue. What could possibly be wrong this?...
>> what lines of investigation should I take?  If you are interested in
>> getting Stackoverflow point you can answer my question in SO here
>> <https://stackoverflow.com/questions/57874681/spark-kafka-streaming-making-progress-but-there-is-no-data-to-be-consumed>.
>>
>>
>> Thanks,
>> Charles
>>
>>
>

Reply via email to