Flink 1.11.2 with Scala 2.12 Error: [info] JobScalaTest: [info] - dummy *** FAILED *** [info] org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink do not match. [info] Query schema: [user: BIGINT, product: ROW<`name` VARCHAR(2147483647), `id` BIGINT>, amount: INT] [info] Sink schema: [user: BIGINT, product: LEGACY('STRUCTURED_TYPE', 'ANY<ProductItem, rO0ABXNyAB1Kb2JTY2FsYVRlc3QkJGFub24kOSQkYW5vbiQxMODHL_EXRquJAgAAeHIANm9yZy5hcGFjaGUuZmxpbmsuYXBpLnNjYWxhLnR5cGV1dGlscy5DYXNlQ2xhc3NUeXBlSW5mb46d1-iqONqQAgAMTAARUEFUVEVSTl9JTlRfRklFTER0ABlMamF2YS91dGlsL3JlZ2V4L1BhdHRlcm47TAAVUEFUVEVSTl9ORVNURURfRklFTERTcQB-AAJMAB5QQVRURVJOX05FU1RFRF9GSUVMRFNfV0lMRENBUkRxAH4AAkwAC1JFR0VYX0ZJRUxEdAASTGphdmEvbGFuZy9TdHJpbmc7TAAPUkVHRVhfSU5UX0ZJRUxEcQB-AANMABNSRUdFWF9ORVNURURfRklFTERTcQB-AANMABxSRUdFWF9ORVNURURfRklFTERTX1dJTERDQVJEcQB-AANMAA9SRUdFWF9TVFJfRklFTERxAH4AA0wABWNsYXp6dAARTGphdmEvbGFuZy9DbGFzcztMAApmaWVsZE5hbWVzdAAWTHNjYWxhL2NvbGxlY3Rpb24vU2VxO0wACmZpZWxkVHlwZXNxAH4ABVsAEnR5cGVQYXJhbVR5cGVJbmZvc3QAN1tMb3JnL2FwYWNoZS9mbGluay9hcGkvY29tbW9uL3R5cGVpbmZvL1R5cGVJbmZvcm1hdGlvbjt4cgA1b3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMuVHVwbGVUeXBlSW5mb0Jhc2UAAAAAAAAAAQIAAkkAC3RvdGFsRmllbGRzWwAFdHlwZXNxAH4ABnhyADNvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLkNvbXBvc2l0ZVR5cGUAAAAAAAAAAQIAAUwACXR5cGVDbGFzc3EAfgAEeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb26UjchIurN66wIAAHhwdnIAC1Byb2R1Y3RJdGVtBwFtETzcflcCAAJKAAJpZEwABG5hbWVxAH4AA3hwAAAAAnVyADdbTG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb247uKBspBqaFLYCAAB4cAAAAAJzcgAyb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkJhc2ljVHlwZUluZm_6BPCCpWndBgIABEwABWNsYXp6cQB-AARMAA9jb21wYXJhdG9yQ2xhc3NxAH4ABFsAF3Bvc3NpYmxlQ2FzdFRhcmdldFR5cGVzdAASW0xqYXZhL2xhbmcvQ2xhc3M7TAAKc2VyaWFsaXplcnQANkxvcmcvYXBhY2hlL2ZsaW5rL2FwaS9jb21tb24vdHlwZXV0aWxzL1R5cGVTZXJpYWxpemVyO3hxAH4ACXZyABBqYXZhLmxhbmcuU3RyaW5noPCkOHo7s0ICAAB4cHZyADtvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuU3RyaW5nQ29tcGFyYXRvcgAAAAAAAAABAgAAeHIAPm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5CYXNpY1R5cGVDb21wYXJhdG9yAAAAAAAAAAECAAJaABNhc2NlbmRpbmdDb21wYXJpc29uWwALY29tcGFyYXRvcnN0ADdbTG9yZy9hcGFjaGUvZmxpbmsvYXBpL2NvbW1vbi90eXBldXRpbHMvVHlwZUNvbXBhcmF0b3I7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZUNvbXBhcmF0b3IAAAAAAAAAAQIAAHhwdXIAEltMamF2YS5sYW5nLkNsYXNzO6sW167LzVqZAgAAeHAAAAAAc3IAO29yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5TdHJpbmdTZXJpYWxpemVyAAAAAAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJpYWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkludGVnZXJUeXBlSW5mb5AFxEVpQEqVAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5OdW1lcmljVHlwZUluZm-tmMZzMAKMFgIAAHhxAH4AD3ZyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHB2cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLkxvbmdDb21wYXJhdG9yAAAAAAAAAAECAAB4cQB-ABZ1cQB-ABoAAAADdnIAD2phdmEubGFuZy5GbG9hdNrtyaLbPPDsAgABRgAFdmFsdWV4cQB-ACR2cgAQamF2YS5sYW5nLkRvdWJsZYCzwkopa_sEAgABRAAFdmFsdWV4cQB-ACR2cgATamF2YS5sYW5nLkNoYXJhY3RlcjSLR9lrGiZ4AgABQwAFdmFsdWV4cHNyADlvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuTG9uZ1NlcmlhbGl6ZXIAAAAAAAAAAQIAAHhxAH4AHXNyABdqYXZhLnV0aWwucmVnZXguUGF0dGVybkZn1WtuSQINAgACSQAFZmxhZ3NMAAdwYXR0ZXJucQB-AAN4cAAAAAB0AAZbMC05XStzcQB-ADEAAAAAdAAwKFtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XSspKFwuKC4rKSk_c3EAfgAxAAAAAHQANihbXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSp8WzAtOV0rKShcLiguKykpP3xcKnxcX3QAJVtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XStxAH4AM3EAfgA1cQB-ADd0AB5bXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSpxAH4ADHNyADJzY2FsYS5jb2xsZWN0aW9uLmltbXV0YWJsZS5MaXN0JFNlcmlhbGl6YXRpb25Qcm94eQAAAAAAAAABAwAAeHB0AARuYW1ldAACaWRzcgAsc2NhbGEuY29sbGVjdGlvbi5pbW11dGFibGUuTGlzdFNlcmlhbGl6ZUVuZCSKXGNb91MLbQIAAHhweHNxAH4AOnEAfgAScQB-ACJxAH4AP3h1cQB-AA0AAAAA>'), amount: INT] [info] at org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:103) [info] at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:260) [info] at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163) [info] at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285) [info] at scala.collection.Iterator.foreach(Iterator.scala:943) [info] at scala.collection.Iterator.foreach$(Iterator.scala:943) [info] at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) [info] at scala.collection.IterableLike.foreach(IterableLike.scala:74) [info] at scala.collection.IterableLike.foreach$(IterableLike.scala:73) [info] at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
Code: import com.remind.graph.people.PeopleJobScala import org.scalatest.funsuite._ import org.scalatest.BeforeAndAfter import org.apache.flink.streaming.api.scala.{ DataStream, StreamExecutionEnvironment } import org.apache.flink.streaming.util.TestStreamEnvironment import org.apache.flink.table.runtime.util._ import org.apache.flink.test.util.AbstractTestBase import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction import org.apache.flink.api.common.state.ListState import org.apache.flink.runtime.state.FunctionInitializationContext import org.apache.flink.api.common.state.ListStateDescriptor import org.apache.flink.runtime.state.FunctionSnapshotContext import org.apache.flink.types.Row import java.io.Serializable; import java.sql.Timestamp; import java.text.SimpleDateFormat import java.util.concurrent.atomic.AtomicInteger import java.{util => ju} import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Try case class Order(user: Long, product: ProductItem, amount: Int) { def this() { this(0, null, 0) } override def toString(): String = { return "Order{" + "user=" + user + ", product='" + product + '\'' + ", amount=" + amount + '}'; } } case class ProductItem(name: String, id: Long) { def this() { this(null, 0) } override def toString(): String = { return "Product{" + "name='" + name + '\'' + ", id=" + id + '}'; } } class JobScalaTest extends AnyFunSuite with BeforeAndAfter { var env: StreamExecutionEnvironment = _ var tEnv: StreamTableEnvironment = _ before { this.env = StreamExecutionEnvironment.getExecutionEnvironment this.env.setParallelism(2) this.env.getConfig.enableObjectReuse() val setting = EnvironmentSettings.newInstance().inStreamingMode().build() this.tEnv = StreamTableEnvironment.create(env, setting) } after { StreamTestSink.clear() // TestValuesTableFactory.clearAllData() } def dateFrom(stringDate: String): java.sql.Date = { val date = new SimpleDateFormat("dd/MM/yyyy") .parse(stringDate) return new java.sql.Date(date.getTime()) } def printTable(table: Table) = { println(table) table.printSchema() println(table.getSchema().getFieldNames().mkString(", ")) } def printDataStream(dataStream: DataStream[_]) = { println(dataStream) println(dataStream.dataType) } test("dummy") { val orderA: DataStream[Order] = this.env.fromCollection( Seq( new Order(1L, new ProductItem("beer", 10L), 3), new Order(1L, new ProductItem("diaper", 11L), 4), new Order(3L, new ProductItem("rubber", 12L), 2) ) ) val orderB: DataStream[Order] = this.env.fromCollection( Seq( new Order(2L, new ProductItem("pen", 13L), 3), new Order(2L, new ProductItem("rubber", 12L), 3), new Order(4L, new ProductItem("beer", 10L), 1) ) ) println(orderB) println(orderB.dataType) // convert DataStream to Table val tableA = this.tEnv.fromDataStream(orderA, 'user, 'product, 'amount) println(tableA) tableA.printSchema() println(tableA.getSchema().getFieldNames().mkString(", ")) // register DataStream as Table this.tEnv.createTemporaryView("OrderB", orderB, 'user, 'product, 'amount) // union the two tables val result = this.tEnv.sqlQuery(s""" |SELECT * FROM $tableA WHERE amount > 2 |UNION ALL |SELECT * FROM OrderB WHERE amount < 2 """.stripMargin) val sink = new StringSink[Order]() result.toAppendStream[Order].addSink(sink) this.env.execute() val expected = List( "Order{user=1, product='Product{name='beer', id=10}', amount=3}", "Order{user=1, product='Product{name='diaper', id=11}', amount=4}", "Order{user=4, product='Product{name='beer', id=10}', amount=1}" ) val results = sink.getResults.sorted println("results") println(results) assert(expected.sorted === results) } } /** * Taken from: https://github.com/apache/flink/blob/release-1.11.2/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala * There's a whole bunch of other test sinks to choose from there. */ object StreamTestSink { val idCounter: AtomicInteger = new AtomicInteger(0) val globalResults = mutable.HashMap.empty[Int, mutable.Map[Int, mutable.ArrayBuffer[String]]] val globalRetractResults = mutable.HashMap.empty[Int, mutable.Map[Int, mutable.ArrayBuffer[String]]] val globalUpsertResults = mutable.HashMap.empty[Int, mutable.Map[Int, mutable.Map[String, String]]] def getNewSinkId: Int = { val idx = idCounter.getAndIncrement() this.synchronized { globalResults.put( idx, mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]] ) globalRetractResults.put( idx, mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]] ) globalUpsertResults.put( idx, mutable.HashMap.empty[Int, mutable.Map[String, String]] ) } idx } def clear(): Unit = { globalResults.clear() globalRetractResults.clear() globalUpsertResults.clear() } } abstract class AbstractExactlyOnceSink[T] extends RichSinkFunction[T] with CheckpointedFunction { protected var resultsState: ListState[String] = _ protected var localResults: mutable.ArrayBuffer[String] = _ protected val idx: Int = StreamTestSink.getNewSinkId protected var globalResults: mutable.Map[Int, mutable.ArrayBuffer[String]] = _ protected var globalRetractResults : mutable.Map[Int, mutable.ArrayBuffer[String]] = _ protected var globalUpsertResults : mutable.Map[Int, mutable.Map[String, String]] = _ def isInitialized: Boolean = globalResults != null override def initializeState(context: FunctionInitializationContext): Unit = { resultsState = context.getOperatorStateStore .getListState( new ListStateDescriptor[String]("sink-results", Types.STRING) ) localResults = mutable.ArrayBuffer.empty[String] if (context.isRestored) { for (value <- resultsState.get().asScala) { localResults += value } } val taskId = getRuntimeContext.getIndexOfThisSubtask StreamTestSink.synchronized( StreamTestSink.globalResults(idx) += (taskId -> localResults) ) } override def snapshotState(context: FunctionSnapshotContext): Unit = { resultsState.clear() for (value <- localResults) { resultsState.add(value) } } protected def clearAndStashGlobalResults(): Unit = { if (globalResults == null) { StreamTestSink.synchronized { globalResults = StreamTestSink.globalResults.remove(idx).get globalRetractResults = StreamTestSink.globalRetractResults.remove(idx).get globalUpsertResults = StreamTestSink.globalUpsertResults.remove(idx).get } } } protected def getResults: List[String] = { clearAndStashGlobalResults() val result = mutable.ArrayBuffer.empty[String] this.globalResults.foreach { case (_, list) => result ++= list } result.toList } } final class StringSink[T] extends AbstractExactlyOnceSink[T]() { override def invoke(value: T) { localResults += value.toString } override def getResults: List[String] = super.getResults } On Mon, Nov 2, 2020 at 5:23 AM Aljoscha Krettek <aljos...@apache.org> wrote: > @Timo: Is this sth that would work when using the new type stack? From > the message I'm assuming it's using the older type stack. > > @Rex: Which Flink version are you using and could you maybe post the > code snipped that you use to do conversions? > > Best, > Aljoscha > > On 02.11.20 06:50, Rex Fenley wrote: > > Maybe this is related to this issue? > > https://issues.apache.org/jira/browse/FLINK-17683 > > > > On Fri, Oct 30, 2020 at 8:43 PM Rex Fenley <r...@remind101.com> wrote: > > > >> Correction, I'm using Scala case classes not strictly Java POJOs just to > >> be clear. > >> > >> On Fri, Oct 30, 2020 at 7:56 PM Rex Fenley <r...@remind101.com> wrote: > >> > >>> Hello, > >>> > >>> I keep running into trouble moving between DataStream and SQL with > POJOs > >>> because my nested POJOs turn into LEGACY('STRUCTURED_TYPE', is there > any > >>> way to convert them back to POJOs in Flink when converting a SQL Table > back > >>> to a DataStream? > >>> > >>> Thanks! > >>> > >>> -- > >>> > >>> Rex Fenley | Software Engineer - Mobile and Backend > >>> > >>> > >>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> > >>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US > >>> <https://www.facebook.com/remindhq> > >>> > >> > >> > >> -- > >> > >> Rex Fenley | Software Engineer - Mobile and Backend > >> > >> > >> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> > | > >> FOLLOW US <https://twitter.com/remindhq> | LIKE US > >> <https://www.facebook.com/remindhq> > >> > > > > > > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>