09306677806 commented on PR #47895:
URL: https://github.com/apache/spark/pull/47895#issuecomment-2405884753

   bc1qqeysv50ayq0az93s5frfvtf2fe6rt5tfkdfx2y
   #*Shahrzadmahro#
   
   در تاریخ پنجشنبه ۱۰ اکتبر ۲۰۲۴،‏ ۲۲:۰۴ Burak Yavuz ***@***.***>
   نوشت:
   
   > ***@***.**** requested changes on this pull request.
   >
   > Wanted to leave quick feedback. Still halfway through of my pass
   > ------------------------------
   >
   > In
   > 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
   > <https://github.com/apache/spark/pull/47895#discussion_r1795903834>:
   >
   > > +   * @param execCtx
   > +   * @param latestExecPlan
   >
   > you can remove these
   > ------------------------------
   >
   > In
   > 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
   > <https://github.com/apache/spark/pull/47895#discussion_r1795908665>:
   >
   > >    /**
   >     * Called after the microbatch has completed execution. It takes care 
of committing the offset
   >     * to commit log and other bookkeeping.
   >     */
   >    protected def markMicroBatchEnd(execCtx: MicroBatchExecutionContext): 
Unit = {
   > -    watermarkTracker.updateWatermark(execCtx.executionPlan.executedPlan)
   > +    val latestExecPlan = execCtx.executionPlan.executedPlan
   > +    watermarkTracker.updateWatermark(latestExecPlan)
   > +    if 
(StatefulOperatorStateInfo.enableStateStoreCheckpointIds(sparkSession.sessionState.conf))
 {
   >
   > should you be using sparkSessionForStream here? Otherwise this can change
   > from microbatch to microbatch, which is risky
   > ------------------------------
   >
   > In
   > 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
   > <https://github.com/apache/spark/pull/47895#discussion_r1795913563>:
   >
   > > @@ -90,6 +91,7 @@ class ReadStateStoreRDD[T: ClassTag, U: ClassTag](
   >      val inputIter = dataRDD.iterator(partition, ctxt)
   >      val store = StateStore.getReadOnly(
   >        storeProviderId, keySchema, valueSchema, keyStateEncoderSpec, 
storeVersion,
   > +      stateStoreCkptIds.map(_(partition.index).head),
   >
   > nit: _.apply(...).head
   > ------------------------------
   >
   > In
   > 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
   > <https://github.com/apache/spark/pull/47895#discussion_r1795913662>:
   >
   > > @@ -126,6 +129,7 @@ class StateStoreRDD[T: ClassTag, U: ClassTag](
   >      val inputIter = dataRDD.iterator(partition, ctxt)
   >      val store = StateStore.get(
   >        storeProviderId, keySchema, valueSchema, keyStateEncoderSpec, 
storeVersion,
   > +      uniqueId.map(_(partition.index).head),
   >
   > ditto
   > ------------------------------
   >
   > In
   > 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
   > <https://github.com/apache/spark/pull/47895#discussion_r1795903980>:
   >
   > > +        execCtx.batchId == -1 || v == execCtx.batchId + 1,
   > +        s"Batch version ${execCtx.batchId} should generate state store 
checkpoint " +
   > +          s"version ${execCtx.batchId + 1} but we see ${v}")
   > +    }
   > +    currentStateStoreCkptId.put(opId, checkpointInfo.map { c =>
   > +      assert(c.stateStoreCkptId.isDefined)
   > +      c.stateStoreCkptId.get
   > +    })
   > +  }
   > +
   > +  private def updateStateStoreCkptId(
   > +      execCtx: MicroBatchExecutionContext,
   > +      latestExecPlan: SparkPlan): Unit = {
   > +    latestExecPlan.collect {
   > +      case e: StateStoreWriter =>
   > +        assert(e.stateInfo.isDefined)
   >
   > did you forget addressing this?
   > ------------------------------
   >
   > In
   > 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
   > <https://github.com/apache/spark/pull/47895#discussion_r1795912992>:
   >
   > > @@ -233,6 +238,15 @@ case class StateStoreMetrics(
   >      memoryUsedBytes: Long,
   >      customMetrics: Map[StateStoreCustomMetric, Long])
   >
   > +case class StateStoreCheckpointInfo(
   > +    partitionId: Int,
   > +    batchVersion: Long,
   > +    // The checkpoint ID for a checkpoint at `batchVersion`. This is used 
to identify the checkpoint
   >
   > can you move these above to @param <https://github.com/param> lines in
   > the scaladoc?
   >
   > —
   > Reply to this email directly, view it on GitHub
   > <https://github.com/apache/spark/pull/47895#pullrequestreview-2361100421>,
   > or unsubscribe
   > 
<https://github.com/notifications/unsubscribe-auth/ANEZZRHC5TZK73IF3D5Z5SDZ23CCRAVCNFSM6AAAAABNGWP2BGVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMZDGNRRGEYDANBSGE>
   > .
   > You are receiving this because you are subscribed to this thread.Message
   > ID: ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to