ericm-db commented on code in PR #54373:
URL: https://github.com/apache/spark/pull/54373#discussion_r2830784148
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala:
##########
@@ -206,8 +210,8 @@ class MicroBatchExecution(
if (!v2Disabled && table.supports(TableCapability.MICRO_BATCH_READ)) {
v2ToRelationMap.getOrElseUpdate(s, {
// Materialize source to avoid creating it in every batch
- val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
- nextSourceId += 1
Review Comment:
in `getMetadataPath`
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/runtime/MicroBatchExecution.scala:
##########
@@ -247,17 +252,36 @@ class MicroBatchExecution(
})
}
}
- sources = _logicalPlan.collect {
+
+ // Extract sources and their sourceIdentifyingName for sourceIdMap mapping
+ val sourcesWithNames = _logicalPlan.collect {
// v1 source
- case s: StreamingExecutionRelation => s.source
+ case s: StreamingExecutionRelation => (s.source, s.sourceIdentifyingName)
// v2 source
- case r: StreamingDataSourceV2ScanRelation => r.stream
+ case r: StreamingDataSourceV2ScanRelation => (r.stream,
r.relation.sourceIdentifyingName)
+ }
+ sources = sourcesWithNames.map(_._1)
+
+ if (enforceNamed) {
+ // When enforcement is enabled, all sources should be named after
validation in analysis.
+ // This assertion ensures that the validation in NameStreamingSources
worked correctly.
+ assert(sourcesWithNames.forall(s => s._2 != Unassigned),
+ "All sources should be named at this point - validation should have
happened in analysis")
+
+ // Create source ID mapping using names (for OffsetMap format)
+ sourceIdMap = sourcesWithNames.map {
+ case (source, UserProvided(name)) => name -> source
+ case (source, FlowAssigned(name)) => name -> source
Review Comment:
Sure yeah
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]