gaborgsomogyi commented on issue #24738: [WIP][SPARK-23098][SQL] Migrate Kafka Batch source to v2. URL: https://github.com/apache/spark/pull/24738#issuecomment-497063432 @cloud-fan @gatorsmile you're driving the DSv2 effort and having lot of experience, so I would like to ask whether some parts are missing from the framework in the sink side since the following exception is coming: ``` sbt.ForkMain$ForkError: org.apache.spark.sql.AnalysisException: Cannot write incompatible data to table 'KafkaTable': - Cannot find data for output column 'key' - Cannot safely cast 'value': StringType to BinaryType - Cannot find data for output column 'partition' - Cannot find data for output column 'offset' - Cannot find data for output column 'timestamp' - Cannot find data for output column 'timestampType'; at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveOutputRelation$.resolveOutputColumns(Analyzer.scala:2353) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveOutputRelation$$anonfun$apply$26.applyOrElse(Analyzer.scala:2283) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveOutputRelation$$anonfun$apply$26.applyOrElse(Analyzer.scala:2280) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$2(AnalysisHelper.scala:108) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$1(AnalysisHelper.scala:108) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown(AnalysisHelper.scala:106) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown$(AnalysisHelper.scala:104) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDown(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:73) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:72) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveOutputRelation$.apply(Analyzer.scala:2280) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveOutputRelation$.apply(Analyzer.scala:2279) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:109) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:89) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:106) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:98) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:98) at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:136) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:130) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:96) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:77) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:77) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:114) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:113) at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:62) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:60) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:60) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52) at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:66) at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:65) at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:72) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:72) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:71) at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:79) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:75) at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:85) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:85) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:98) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:729) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301) at org.apache.spark.sql.kafka010.KafkaSinkBatchSuiteBase.$anonfun$new$31(KafkaSinkSuite.scala:375) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186) at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:149) at org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184) at org.scalatest.FunSuiteLike.$anonfun$runTest$1(FunSuiteLike.scala:196) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289) at org.scalatest.FunSuiteLike.runTest(FunSuiteLike.scala:196) at org.scalatest.FunSuiteLike.runTest$(FunSuiteLike.scala:178) at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:56) at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:221) at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:214) at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:56) at org.scalatest.FunSuiteLike.$anonfun$runTests$1(FunSuiteLike.scala:229) at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396) at scala.collection.immutable.List.foreach(List.scala:392) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384) at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:379) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461) at org.scalatest.FunSuiteLike.runTests(FunSuiteLike.scala:229) at org.scalatest.FunSuiteLike.runTests$(FunSuiteLike.scala:228) at org.scalatest.FunSuite.runTests(FunSuite.scala:1560) at org.scalatest.Suite.run(Suite.scala:1147) at org.scalatest.Suite.run$(Suite.scala:1129) at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560) at org.scalatest.FunSuiteLike.$anonfun$run$1(FunSuiteLike.scala:233) at org.scalatest.SuperEngine.runImpl(Engine.scala:521) at org.scalatest.FunSuiteLike.run(FunSuiteLike.scala:233) at org.scalatest.FunSuiteLike.run$(FunSuiteLike.scala:232) at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:56) at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213) at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210) at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208) at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:56) at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314) at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:507) at sbt.ForkMain$Run$2.call(ForkMain.java:296) at sbt.ForkMain$Run$2.call(ForkMain.java:286) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` It's clear that the mapping is missing but haven't found any place where I can add it.
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
