ericm-db commented on code in PR #54373:
URL: https://github.com/apache/spark/pull/54373#discussion_r2830784148


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala:
##########
@@ -206,8 +210,8 @@ class MicroBatchExecution(
         if (!v2Disabled && table.supports(TableCapability.MICRO_BATCH_READ)) {
           v2ToRelationMap.getOrElseUpdate(s, {
             // Materialize source to avoid creating it in every batch
-            val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
-            nextSourceId += 1

Review Comment:
   in `getMetadataPath`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala:
##########
@@ -247,17 +252,36 @@ class MicroBatchExecution(
           })
         }
     }
-    sources = _logicalPlan.collect {
+
+    // Extract sources and their sourceIdentifyingName for sourceIdMap mapping
+    val sourcesWithNames = _logicalPlan.collect {
       // v1 source
-      case s: StreamingExecutionRelation => s.source
+      case s: StreamingExecutionRelation => (s.source, s.sourceIdentifyingName)
       // v2 source
-      case r: StreamingDataSourceV2ScanRelation => r.stream
+      case r: StreamingDataSourceV2ScanRelation => (r.stream, 
r.relation.sourceIdentifyingName)
+    }
+    sources = sourcesWithNames.map(_._1)
+
+    if (enforceNamed) {
+      // When enforcement is enabled, all sources should be named after 
validation in analysis.
+      // This assertion ensures that the validation in NameStreamingSources 
worked correctly.
+      assert(sourcesWithNames.forall(s => s._2 != Unassigned),
+        "All sources should be named at this point - validation should have 
happened in analysis")
+
+      // Create source ID mapping using names (for OffsetMap format)
+      sourceIdMap = sourcesWithNames.map {
+        case (source, UserProvided(name)) => name -> source
+        case (source, FlowAssigned(name)) => name -> source

Review Comment:
   Sure yeah



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to