My jobs normally use the blink planner, I noticed with this test that may not be the case.
On Mon, Nov 2, 2020 at 12:38 PM Rex Fenley <r...@remind101.com> wrote: > Flink 1.11.2 with Scala 2.12 > > Error: > [info] JobScalaTest: > [info] - dummy *** FAILED *** > [info] org.apache.flink.table.api.ValidationException: Field types of > query result and registered TableSink do not match. > [info] Query schema: [user: BIGINT, product: ROW<`name` > VARCHAR(2147483647), `id` BIGINT>, amount: INT] > [info] Sink schema: [user: BIGINT, product: LEGACY('STRUCTURED_TYPE', > 'ANY<ProductItem, > rO0ABXNyAB1Kb2JTY2FsYVRlc3QkJGFub24kOSQkYW5vbiQxMODHL_EXRquJAgAAeHIANm9yZy5hcGFjaGUuZmxpbmsuYXBpLnNjYWxhLnR5cGV1dGlscy5DYXNlQ2xhc3NUeXBlSW5mb46d1-iqONqQAgAMTAARUEFUVEVSTl9JTlRfRklFTER0ABlMamF2YS91dGlsL3JlZ2V4L1BhdHRlcm47TAAVUEFUVEVSTl9ORVNURURfRklFTERTcQB-AAJMAB5QQVRURVJOX05FU1RFRF9GSUVMRFNfV0lMRENBUkRxAH4AAkwAC1JFR0VYX0ZJRUxEdAASTGphdmEvbGFuZy9TdHJpbmc7TAAPUkVHRVhfSU5UX0ZJRUxEcQB-AANMABNSRUdFWF9ORVNURURfRklFTERTcQB-AANMABxSRUdFWF9ORVNURURfRklFTERTX1dJTERDQVJEcQB-AANMAA9SRUdFWF9TVFJfRklFTERxAH4AA0wABWNsYXp6dAARTGphdmEvbGFuZy9DbGFzcztMAApmaWVsZE5hbWVzdAAWTHNjYWxhL2NvbGxlY3Rpb24vU2VxO0wACmZpZWxkVHlwZXNxAH4ABVsAEnR5cGVQYXJhbVR5cGVJbmZvc3QAN1tMb3JnL2FwYWNoZS9mbGluay9hcGkvY29tbW9uL3R5cGVpbmZvL1R5cGVJbmZvcm1hdGlvbjt4cgA1b3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMuVHVwbGVUeXBlSW5mb0Jhc2UAAAAAAAAAAQIAAkkAC3RvdGFsRmllbGRzWwAFdHlwZXNxAH4ABnhyADNvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLkNvbXBvc2l0ZVR5cGUAAAAAAAAAAQIAAUwACXR5cGVDbGFzc3EAfgAEeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb26UjchIurN66wIAAHhwdnIAC1Byb2R1Y3RJdGVtBwFtETzcflcCAAJKAAJpZEwABG5hbWVxAH4AA3hwAAAAAnVyADdbTG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb247uKBspBqaFLYCAAB4cAAAAAJzcgAyb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkJhc2ljVHlwZUluZm_6BPCCpWndBgIABEwABWNsYXp6cQB-AARMAA9jb21wYXJhdG9yQ2xhc3NxAH4ABFsAF3Bvc3NpYmxlQ2FzdFRhcmdldFR5cGVzdAASW0xqYXZhL2xhbmcvQ2xhc3M7TAAKc2VyaWFsaXplcnQANkxvcmcvYXBhY2hlL2ZsaW5rL2FwaS9jb21tb24vdHlwZXV0aWxzL1R5cGVTZXJpYWxpemVyO3hxAH4ACXZyABBqYXZhLmxhbmcuU3RyaW5noPCkOHo7s0ICAAB4cHZyADtvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuU3RyaW5nQ29tcGFyYXRvcgAAAAAAAAABAgAAeHIAPm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5CYXNpY1R5cGVDb21wYXJhdG9yAAAAAAAAAAECAAJaABNhc2NlbmRpbmdDb21wYXJpc29uWwALY29tcGFyYXRvcnN0ADdbTG9yZy9hcGFjaGUvZmxpbmsvYXBpL2NvbW1vbi90eXBldXRpbHMvVHlwZUNvbXBhcmF0b3I7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZUNvbXBhcmF0b3IAAAAAAAAAAQIAAHhwdXIAEltMamF2YS5sYW5nLkNsYXNzO6sW167LzVqZAgAAeHAAAAAAc3IAO29yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5TdHJpbmdTZXJpYWxpemVyAAAAAAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJpYWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkludGVnZXJUeXBlSW5mb5AFxEVpQEqVAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5OdW1lcmljVHlwZUluZm-tmMZzMAKMFgIAAHhxAH4AD3ZyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHB2cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLkxvbmdDb21wYXJhdG9yAAAAAAAAAAECAAB4cQB-ABZ1cQB-ABoAAAADdnIAD2phdmEubGFuZy5GbG9hdNrtyaLbPPDsAgABRgAFdmFsdWV4cQB-ACR2cgAQamF2YS5sYW5nLkRvdWJsZYCzwkopa_sEAgABRAAFdmFsdWV4cQB-ACR2cgATamF2YS5sYW5nLkNoYXJhY3RlcjSLR9lrGiZ4AgABQwAFdmFsdWV4cHNyADlvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuTG9uZ1NlcmlhbGl6ZXIAAAAAAAAAAQIAAHhxAH4AHXNyABdqYXZhLnV0aWwucmVnZXguUGF0dGVybkZn1WtuSQINAgACSQAFZmxhZ3NMAAdwYXR0ZXJucQB-AAN4cAAAAAB0AAZbMC05XStzcQB-ADEAAAAAdAAwKFtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XSspKFwuKC4rKSk_c3EAfgAxAAAAAHQANihbXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSp8WzAtOV0rKShcLiguKykpP3xcKnxcX3QAJVtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XStxAH4AM3EAfgA1cQB-ADd0AB5bXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSpxAH4ADHNyADJzY2FsYS5jb2xsZWN0aW9uLmltbXV0YWJsZS5MaXN0JFNlcmlhbGl6YXRpb25Qcm94eQAAAAAAAAABAwAAeHB0AARuYW1ldAACaWRzcgAsc2NhbGEuY29sbGVjdGlvbi5pbW11dGFibGUuTGlzdFNlcmlhbGl6ZUVuZCSKXGNb91MLbQIAAHhweHNxAH4AOnEAfgAScQB-ACJxAH4AP3h1cQB-AA0AAAAA>'), > amount: INT] > [info] at > org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:103) > [info] at > org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:260) > [info] at > org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163) > [info] at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285) > [info] at scala.collection.Iterator.foreach(Iterator.scala:943) > [info] at scala.collection.Iterator.foreach$(Iterator.scala:943) > [info] at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) > [info] at scala.collection.IterableLike.foreach(IterableLike.scala:74) > [info] at scala.collection.IterableLike.foreach$(IterableLike.scala:73) > [info] at scala.collection.AbstractIterable.foreach(Iterable.scala:56) > > Code: > import com.remind.graph.people.PeopleJobScala > > import org.scalatest.funsuite._ > import org.scalatest.BeforeAndAfter > > import org.apache.flink.streaming.api.scala.{ > DataStream, > StreamExecutionEnvironment > } > import org.apache.flink.streaming.util.TestStreamEnvironment > import org.apache.flink.table.runtime.util._ > import org.apache.flink.test.util.AbstractTestBase > import org.apache.flink.table.api._ > import org.apache.flink.table.api.bridge.scala._ > import org.apache.flink.streaming.api.scala._ > import org.apache.flink.streaming.api.functions.sink.RichSinkFunction > import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction > import org.apache.flink.api.common.state.ListState > import org.apache.flink.runtime.state.FunctionInitializationContext > import org.apache.flink.api.common.state.ListStateDescriptor > import org.apache.flink.runtime.state.FunctionSnapshotContext > import org.apache.flink.types.Row > > import java.io.Serializable; > import java.sql.Timestamp; > import java.text.SimpleDateFormat > import java.util.concurrent.atomic.AtomicInteger > import java.{util => ju} > > import scala.collection.JavaConverters._ > import scala.collection.mutable > import scala.util.Try > > case class Order(user: Long, product: ProductItem, amount: Int) { > def this() { > this(0, null, 0) > } > > override def toString(): String = { > return "Order{" + > "user=" + user + > ", product='" + product + '\'' + > ", amount=" + amount + > '}'; > } > } > > case class ProductItem(name: String, id: Long) { > def this() { > this(null, 0) > } > > override def toString(): String = { > return "Product{" + > "name='" + name + '\'' + > ", id=" + id + > '}'; > } > } > > class JobScalaTest extends AnyFunSuite with BeforeAndAfter { > var env: StreamExecutionEnvironment = _ > var tEnv: StreamTableEnvironment = _ > > before { > this.env = StreamExecutionEnvironment.getExecutionEnvironment > this.env.setParallelism(2) > this.env.getConfig.enableObjectReuse() > val setting = EnvironmentSettings.newInstance().inStreamingMode().build() > this.tEnv = StreamTableEnvironment.create(env, setting) > } > > after { > StreamTestSink.clear() > // TestValuesTableFactory.clearAllData() > } > > def dateFrom(stringDate: String): java.sql.Date = { > val date = new SimpleDateFormat("dd/MM/yyyy") > .parse(stringDate) > return new java.sql.Date(date.getTime()) > } > > def printTable(table: Table) = { > println(table) > table.printSchema() > println(table.getSchema().getFieldNames().mkString(", ")) > } > > def printDataStream(dataStream: DataStream[_]) = { > println(dataStream) > println(dataStream.dataType) > } > > test("dummy") { > val orderA: DataStream[Order] = this.env.fromCollection( > Seq( > new Order(1L, new ProductItem("beer", 10L), 3), > new Order(1L, new ProductItem("diaper", 11L), 4), > new Order(3L, new ProductItem("rubber", 12L), 2) > ) > ) > > val orderB: DataStream[Order] = this.env.fromCollection( > Seq( > new Order(2L, new ProductItem("pen", 13L), 3), > new Order(2L, new ProductItem("rubber", 12L), 3), > new Order(4L, new ProductItem("beer", 10L), 1) > ) > ) > > println(orderB) > println(orderB.dataType) > > // convert DataStream to Table > val tableA = > this.tEnv.fromDataStream(orderA, 'user, 'product, 'amount) > println(tableA) > tableA.printSchema() > println(tableA.getSchema().getFieldNames().mkString(", ")) > // register DataStream as Table > this.tEnv.createTemporaryView("OrderB", orderB, 'user, 'product, 'amount) > > // union the two tables > val result = this.tEnv.sqlQuery(s""" > |SELECT * FROM $tableA WHERE amount > 2 > |UNION ALL > |SELECT * FROM OrderB WHERE amount < 2 > """.stripMargin) > > val sink = new StringSink[Order]() > result.toAppendStream[Order].addSink(sink) > > this.env.execute() > > val expected = List( > "Order{user=1, product='Product{name='beer', id=10}', amount=3}", > "Order{user=1, product='Product{name='diaper', id=11}', amount=4}", > "Order{user=4, product='Product{name='beer', id=10}', amount=1}" > ) > val results = sink.getResults.sorted > println("results") > println(results) > assert(expected.sorted === results) > } > } > > /** > * Taken from: > https://github.com/apache/flink/blob/release-1.11.2/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala > * There's a whole bunch of other test sinks to choose from there. > */ > object StreamTestSink { > > val idCounter: AtomicInteger = new AtomicInteger(0) > > val globalResults = > mutable.HashMap.empty[Int, mutable.Map[Int, mutable.ArrayBuffer[String]]] > val globalRetractResults = > mutable.HashMap.empty[Int, mutable.Map[Int, mutable.ArrayBuffer[String]]] > val globalUpsertResults = > mutable.HashMap.empty[Int, mutable.Map[Int, mutable.Map[String, String]]] > > def getNewSinkId: Int = { > val idx = idCounter.getAndIncrement() > this.synchronized { > globalResults.put( > idx, > mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]] > ) > globalRetractResults.put( > idx, > mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]] > ) > globalUpsertResults.put( > idx, > mutable.HashMap.empty[Int, mutable.Map[String, String]] > ) > } > idx > } > > def clear(): Unit = { > globalResults.clear() > globalRetractResults.clear() > globalUpsertResults.clear() > } > } > > abstract class AbstractExactlyOnceSink[T] > extends RichSinkFunction[T] > with CheckpointedFunction { > protected var resultsState: ListState[String] = _ > protected var localResults: mutable.ArrayBuffer[String] = _ > protected val idx: Int = StreamTestSink.getNewSinkId > > protected var globalResults: mutable.Map[Int, > mutable.ArrayBuffer[String]] = _ > protected var globalRetractResults > : mutable.Map[Int, mutable.ArrayBuffer[String]] = _ > protected var globalUpsertResults > : mutable.Map[Int, mutable.Map[String, String]] = _ > > def isInitialized: Boolean = globalResults != null > > override def initializeState(context: FunctionInitializationContext): > Unit = { > resultsState = context.getOperatorStateStore > .getListState( > new ListStateDescriptor[String]("sink-results", Types.STRING) > ) > > localResults = mutable.ArrayBuffer.empty[String] > > if (context.isRestored) { > for (value <- resultsState.get().asScala) { > localResults += value > } > } > > val taskId = getRuntimeContext.getIndexOfThisSubtask > StreamTestSink.synchronized( > StreamTestSink.globalResults(idx) += (taskId -> localResults) > ) > } > > override def snapshotState(context: FunctionSnapshotContext): Unit = { > resultsState.clear() > for (value <- localResults) { > resultsState.add(value) > } > } > > protected def clearAndStashGlobalResults(): Unit = { > if (globalResults == null) { > StreamTestSink.synchronized { > globalResults = StreamTestSink.globalResults.remove(idx).get > globalRetractResults = > StreamTestSink.globalRetractResults.remove(idx).get > globalUpsertResults = StreamTestSink.globalUpsertResults.remove(idx).get > } > } > } > > protected def getResults: List[String] = { > clearAndStashGlobalResults() > val result = mutable.ArrayBuffer.empty[String] > this.globalResults.foreach { > case (_, list) => result ++= list > } > result.toList > } > } > > final class StringSink[T] extends AbstractExactlyOnceSink[T]() { > override def invoke(value: T) { > localResults += value.toString > } > > override def getResults: List[String] = super.getResults > } > > > > On Mon, Nov 2, 2020 at 5:23 AM Aljoscha Krettek <aljos...@apache.org> > wrote: > >> @Timo: Is this sth that would work when using the new type stack? From >> the message I'm assuming it's using the older type stack. >> >> @Rex: Which Flink version are you using and could you maybe post the >> code snipped that you use to do conversions? >> >> Best, >> Aljoscha >> >> On 02.11.20 06:50, Rex Fenley wrote: >> > Maybe this is related to this issue? >> > https://issues.apache.org/jira/browse/FLINK-17683 >> > >> > On Fri, Oct 30, 2020 at 8:43 PM Rex Fenley <r...@remind101.com> wrote: >> > >> >> Correction, I'm using Scala case classes not strictly Java POJOs just >> to >> >> be clear. >> >> >> >> On Fri, Oct 30, 2020 at 7:56 PM Rex Fenley <r...@remind101.com> wrote: >> >> >> >>> Hello, >> >>> >> >>> I keep running into trouble moving between DataStream and SQL with >> POJOs >> >>> because my nested POJOs turn into LEGACY('STRUCTURED_TYPE', is there >> any >> >>> way to convert them back to POJOs in Flink when converting a SQL >> Table back >> >>> to a DataStream? >> >>> >> >>> Thanks! >> >>> >> >>> -- >> >>> >> >>> Rex Fenley | Software Engineer - Mobile and Backend >> >>> >> >>> >> >>> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/ >> > >> >>> | FOLLOW US <https://twitter.com/remindhq> | LIKE US >> >>> <https://www.facebook.com/remindhq> >> >>> >> >> >> >> >> >> -- >> >> >> >> Rex Fenley | Software Engineer - Mobile and Backend >> >> >> >> >> >> Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> >> | >> >> FOLLOW US <https://twitter.com/remindhq> | LIKE US >> >> <https://www.facebook.com/remindhq> >> >> >> > >> > >> >> > > -- > > Rex Fenley | Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | > FOLLOW US <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>