gaborgsomogyi commented on issue #24738: [WIP][SPARK-23098][SQL] Migrate Kafka 
Batch source to v2.
URL: https://github.com/apache/spark/pull/24738#issuecomment-497063432
 
 
   @cloud-fan @gatorsmile you're driving the DSv2 effort and having lot of 
experience, so I would like to ask whether some parts are missing from the 
framework in the sink side since the following exception is coming:
   ```
   sbt.ForkMain$ForkError: org.apache.spark.sql.AnalysisException: Cannot write 
incompatible data to table 'KafkaTable':
   - Cannot find data for output column 'key'
   - Cannot safely cast 'value': StringType to BinaryType
   - Cannot find data for output column 'partition'
   - Cannot find data for output column 'offset'
   - Cannot find data for output column 'timestamp'
   - Cannot find data for output column 'timestampType';
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveOutputRelation$.resolveOutputColumns(Analyzer.scala:2353)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveOutputRelation$$anonfun$apply$26.applyOrElse(Analyzer.scala:2283)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveOutputRelation$$anonfun$apply$26.applyOrElse(Analyzer.scala:2280)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$2(AnalysisHelper.scala:108)
        at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDown$1(AnalysisHelper.scala:108)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:194)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown(AnalysisHelper.scala:106)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDown$(AnalysisHelper.scala:104)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDown(LogicalPlan.scala:29)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:73)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:72)
        at 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:29)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveOutputRelation$.apply(Analyzer.scala:2280)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveOutputRelation$.apply(Analyzer.scala:2279)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:109)
        at 
scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at 
scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:89)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:106)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:98)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:98)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:136)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:130)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:96)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:77)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
        at 
org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:77)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:114)
        at 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:201)
        at 
org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:113)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:62)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at 
org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:60)
        at 
org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:60)
        at 
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
        at 
org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:66)
        at 
org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:65)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:72)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at 
org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:72)
        at 
org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:71)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:79)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:75)
        at 
org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
        at 
org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:85)
        at 
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:85)
        at 
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:98)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
        at 
org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:729)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
        at 
org.apache.spark.sql.kafka010.KafkaSinkBatchSuiteBase.$anonfun$new$31(KafkaSinkSuite.scala:375)
        at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
        at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
        at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
        at org.scalatest.Transformer.apply(Transformer.scala:22)
        at org.scalatest.Transformer.apply(Transformer.scala:20)
        at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
        at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:149)
        at 
org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184)
        at org.scalatest.FunSuiteLike.$anonfun$runTest$1(FunSuiteLike.scala:196)
        at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
        at org.scalatest.FunSuiteLike.runTest(FunSuiteLike.scala:196)
        at org.scalatest.FunSuiteLike.runTest$(FunSuiteLike.scala:178)
        at 
org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:56)
        at 
org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:221)
        at 
org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:214)
        at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:56)
        at 
org.scalatest.FunSuiteLike.$anonfun$runTests$1(FunSuiteLike.scala:229)
        at 
org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
        at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:379)
        at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
        at org.scalatest.FunSuiteLike.runTests(FunSuiteLike.scala:229)
        at org.scalatest.FunSuiteLike.runTests$(FunSuiteLike.scala:228)
        at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
        at org.scalatest.Suite.run(Suite.scala:1147)
        at org.scalatest.Suite.run$(Suite.scala:1129)
        at 
org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
        at org.scalatest.FunSuiteLike.$anonfun$run$1(FunSuiteLike.scala:233)
        at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
        at org.scalatest.FunSuiteLike.run(FunSuiteLike.scala:233)
        at org.scalatest.FunSuiteLike.run$(FunSuiteLike.scala:232)
        at 
org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:56)
        at 
org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
        at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
        at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
        at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:56)
        at 
org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
        at 
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:507)
        at sbt.ForkMain$Run$2.call(ForkMain.java:296)
        at sbt.ForkMain$Run$2.call(ForkMain.java:286)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   ```
   It's clear that the mapping is missing but haven't found any place where I 
can add it.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to