My jobs normally use the blink planner, I noticed with this test that may
not be the case.

On Mon, Nov 2, 2020 at 12:38 PM Rex Fenley <r...@remind101.com> wrote:

> Flink 1.11.2 with Scala 2.12
>
> Error:
> [info] JobScalaTest:
> [info] - dummy *** FAILED ***
> [info]   org.apache.flink.table.api.ValidationException: Field types of
> query result and registered TableSink  do not match.
> [info] Query schema: [user: BIGINT, product: ROW<`name`
> VARCHAR(2147483647), `id` BIGINT>, amount: INT]
> [info] Sink schema: [user: BIGINT, product: LEGACY('STRUCTURED_TYPE',
> 'ANY<ProductItem,
> rO0ABXNyAB1Kb2JTY2FsYVRlc3QkJGFub24kOSQkYW5vbiQxMODHL_EXRquJAgAAeHIANm9yZy5hcGFjaGUuZmxpbmsuYXBpLnNjYWxhLnR5cGV1dGlscy5DYXNlQ2xhc3NUeXBlSW5mb46d1-iqONqQAgAMTAARUEFUVEVSTl9JTlRfRklFTER0ABlMamF2YS91dGlsL3JlZ2V4L1BhdHRlcm47TAAVUEFUVEVSTl9ORVNURURfRklFTERTcQB-AAJMAB5QQVRURVJOX05FU1RFRF9GSUVMRFNfV0lMRENBUkRxAH4AAkwAC1JFR0VYX0ZJRUxEdAASTGphdmEvbGFuZy9TdHJpbmc7TAAPUkVHRVhfSU5UX0ZJRUxEcQB-AANMABNSRUdFWF9ORVNURURfRklFTERTcQB-AANMABxSRUdFWF9ORVNURURfRklFTERTX1dJTERDQVJEcQB-AANMAA9SRUdFWF9TVFJfRklFTERxAH4AA0wABWNsYXp6dAARTGphdmEvbGFuZy9DbGFzcztMAApmaWVsZE5hbWVzdAAWTHNjYWxhL2NvbGxlY3Rpb24vU2VxO0wACmZpZWxkVHlwZXNxAH4ABVsAEnR5cGVQYXJhbVR5cGVJbmZvc3QAN1tMb3JnL2FwYWNoZS9mbGluay9hcGkvY29tbW9uL3R5cGVpbmZvL1R5cGVJbmZvcm1hdGlvbjt4cgA1b3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMuVHVwbGVUeXBlSW5mb0Jhc2UAAAAAAAAAAQIAAkkAC3RvdGFsRmllbGRzWwAFdHlwZXNxAH4ABnhyADNvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLkNvbXBvc2l0ZVR5cGUAAAAAAAAAAQIAAUwACXR5cGVDbGFzc3EAfgAEeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb26UjchIurN66wIAAHhwdnIAC1Byb2R1Y3RJdGVtBwFtETzcflcCAAJKAAJpZEwABG5hbWVxAH4AA3hwAAAAAnVyADdbTG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb247uKBspBqaFLYCAAB4cAAAAAJzcgAyb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkJhc2ljVHlwZUluZm_6BPCCpWndBgIABEwABWNsYXp6cQB-AARMAA9jb21wYXJhdG9yQ2xhc3NxAH4ABFsAF3Bvc3NpYmxlQ2FzdFRhcmdldFR5cGVzdAASW0xqYXZhL2xhbmcvQ2xhc3M7TAAKc2VyaWFsaXplcnQANkxvcmcvYXBhY2hlL2ZsaW5rL2FwaS9jb21tb24vdHlwZXV0aWxzL1R5cGVTZXJpYWxpemVyO3hxAH4ACXZyABBqYXZhLmxhbmcuU3RyaW5noPCkOHo7s0ICAAB4cHZyADtvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuU3RyaW5nQ29tcGFyYXRvcgAAAAAAAAABAgAAeHIAPm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5CYXNpY1R5cGVDb21wYXJhdG9yAAAAAAAAAAECAAJaABNhc2NlbmRpbmdDb21wYXJpc29uWwALY29tcGFyYXRvcnN0ADdbTG9yZy9hcGFjaGUvZmxpbmsvYXBpL2NvbW1vbi90eXBldXRpbHMvVHlwZUNvbXBhcmF0b3I7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZUNvbXBhcmF0b3IAAAAAAAAAAQIAAHhwdXIAEltMamF2YS5sYW5nLkNsYXNzO6sW167LzVqZAgAAeHAAAAAAc3IAO29yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5TdHJpbmdTZXJpYWxpemVyAAAAAAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJpYWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkludGVnZXJUeXBlSW5mb5AFxEVpQEqVAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5OdW1lcmljVHlwZUluZm-tmMZzMAKMFgIAAHhxAH4AD3ZyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHB2cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLkxvbmdDb21wYXJhdG9yAAAAAAAAAAECAAB4cQB-ABZ1cQB-ABoAAAADdnIAD2phdmEubGFuZy5GbG9hdNrtyaLbPPDsAgABRgAFdmFsdWV4cQB-ACR2cgAQamF2YS5sYW5nLkRvdWJsZYCzwkopa_sEAgABRAAFdmFsdWV4cQB-ACR2cgATamF2YS5sYW5nLkNoYXJhY3RlcjSLR9lrGiZ4AgABQwAFdmFsdWV4cHNyADlvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuTG9uZ1NlcmlhbGl6ZXIAAAAAAAAAAQIAAHhxAH4AHXNyABdqYXZhLnV0aWwucmVnZXguUGF0dGVybkZn1WtuSQINAgACSQAFZmxhZ3NMAAdwYXR0ZXJucQB-AAN4cAAAAAB0AAZbMC05XStzcQB-ADEAAAAAdAAwKFtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XSspKFwuKC4rKSk_c3EAfgAxAAAAAHQANihbXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSp8WzAtOV0rKShcLiguKykpP3xcKnxcX3QAJVtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XStxAH4AM3EAfgA1cQB-ADd0AB5bXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSpxAH4ADHNyADJzY2FsYS5jb2xsZWN0aW9uLmltbXV0YWJsZS5MaXN0JFNlcmlhbGl6YXRpb25Qcm94eQAAAAAAAAABAwAAeHB0AARuYW1ldAACaWRzcgAsc2NhbGEuY29sbGVjdGlvbi5pbW11dGFibGUuTGlzdFNlcmlhbGl6ZUVuZCSKXGNb91MLbQIAAHhweHNxAH4AOnEAfgAScQB-ACJxAH4AP3h1cQB-AA0AAAAA>'),
> amount: INT]
> [info]   at
> org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:103)
> [info]   at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:260)
> [info]   at
> org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163)
> [info]   at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
> [info]   at scala.collection.Iterator.foreach(Iterator.scala:943)
> [info]   at scala.collection.Iterator.foreach$(Iterator.scala:943)
> [info]   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> [info]   at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> [info]   at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> [info]   at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>
> Code:
> import com.remind.graph.people.PeopleJobScala
>
> import org.scalatest.funsuite._
> import org.scalatest.BeforeAndAfter
>
> import org.apache.flink.streaming.api.scala.{
> DataStream,
> StreamExecutionEnvironment
> }
> import org.apache.flink.streaming.util.TestStreamEnvironment
> import org.apache.flink.table.runtime.util._
> import org.apache.flink.test.util.AbstractTestBase
> import org.apache.flink.table.api._
> import org.apache.flink.table.api.bridge.scala._
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
> import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
> import org.apache.flink.api.common.state.ListState
> import org.apache.flink.runtime.state.FunctionInitializationContext
> import org.apache.flink.api.common.state.ListStateDescriptor
> import org.apache.flink.runtime.state.FunctionSnapshotContext
> import org.apache.flink.types.Row
>
> import java.io.Serializable;
> import java.sql.Timestamp;
> import java.text.SimpleDateFormat
> import java.util.concurrent.atomic.AtomicInteger
> import java.{util => ju}
>
> import scala.collection.JavaConverters._
> import scala.collection.mutable
> import scala.util.Try
>
> case class Order(user: Long, product: ProductItem, amount: Int) {
> def this() {
> this(0, null, 0)
> }
>
> override def toString(): String = {
> return "Order{" +
> "user=" + user +
> ", product='" + product + '\'' +
> ", amount=" + amount +
> '}';
> }
> }
>
> case class ProductItem(name: String, id: Long) {
> def this() {
> this(null, 0)
> }
>
> override def toString(): String = {
> return "Product{" +
> "name='" + name + '\'' +
> ", id=" + id +
> '}';
> }
> }
>
> class JobScalaTest extends AnyFunSuite with BeforeAndAfter {
> var env: StreamExecutionEnvironment = _
> var tEnv: StreamTableEnvironment = _
>
> before {
> this.env = StreamExecutionEnvironment.getExecutionEnvironment
> this.env.setParallelism(2)
> this.env.getConfig.enableObjectReuse()
> val setting = EnvironmentSettings.newInstance().inStreamingMode().build()
> this.tEnv = StreamTableEnvironment.create(env, setting)
> }
>
> after {
> StreamTestSink.clear()
> // TestValuesTableFactory.clearAllData()
> }
>
> def dateFrom(stringDate: String): java.sql.Date = {
> val date = new SimpleDateFormat("dd/MM/yyyy")
> .parse(stringDate)
> return new java.sql.Date(date.getTime())
> }
>
> def printTable(table: Table) = {
> println(table)
> table.printSchema()
> println(table.getSchema().getFieldNames().mkString(", "))
> }
>
> def printDataStream(dataStream: DataStream[_]) = {
> println(dataStream)
> println(dataStream.dataType)
> }
>
> test("dummy") {
> val orderA: DataStream[Order] = this.env.fromCollection(
> Seq(
> new Order(1L, new ProductItem("beer", 10L), 3),
> new Order(1L, new ProductItem("diaper", 11L), 4),
> new Order(3L, new ProductItem("rubber", 12L), 2)
> )
> )
>
> val orderB: DataStream[Order] = this.env.fromCollection(
> Seq(
> new Order(2L, new ProductItem("pen", 13L), 3),
> new Order(2L, new ProductItem("rubber", 12L), 3),
> new Order(4L, new ProductItem("beer", 10L), 1)
> )
> )
>
> println(orderB)
> println(orderB.dataType)
>
> // convert DataStream to Table
> val tableA =
> this.tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
> println(tableA)
> tableA.printSchema()
> println(tableA.getSchema().getFieldNames().mkString(", "))
> // register DataStream as Table
> this.tEnv.createTemporaryView("OrderB", orderB, 'user, 'product, 'amount)
>
> // union the two tables
> val result = this.tEnv.sqlQuery(s"""
> |SELECT * FROM $tableA WHERE amount > 2
> |UNION ALL
> |SELECT * FROM OrderB WHERE amount < 2
> """.stripMargin)
>
> val sink = new StringSink[Order]()
> result.toAppendStream[Order].addSink(sink)
>
> this.env.execute()
>
> val expected = List(
> "Order{user=1, product='Product{name='beer', id=10}', amount=3}",
> "Order{user=1, product='Product{name='diaper', id=11}', amount=4}",
> "Order{user=4, product='Product{name='beer', id=10}', amount=1}"
> )
> val results = sink.getResults.sorted
> println("results")
> println(results)
> assert(expected.sorted === results)
> }
> }
>
> /**
> * Taken from:
> https://github.com/apache/flink/blob/release-1.11.2/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala
> * There's a whole bunch of other test sinks to choose from there.
> */
> object StreamTestSink {
>
> val idCounter: AtomicInteger = new AtomicInteger(0)
>
> val globalResults =
> mutable.HashMap.empty[Int, mutable.Map[Int, mutable.ArrayBuffer[String]]]
> val globalRetractResults =
> mutable.HashMap.empty[Int, mutable.Map[Int, mutable.ArrayBuffer[String]]]
> val globalUpsertResults =
> mutable.HashMap.empty[Int, mutable.Map[Int, mutable.Map[String, String]]]
>
> def getNewSinkId: Int = {
> val idx = idCounter.getAndIncrement()
> this.synchronized {
> globalResults.put(
> idx,
> mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
> )
> globalRetractResults.put(
> idx,
> mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
> )
> globalUpsertResults.put(
> idx,
> mutable.HashMap.empty[Int, mutable.Map[String, String]]
> )
> }
> idx
> }
>
> def clear(): Unit = {
> globalResults.clear()
> globalRetractResults.clear()
> globalUpsertResults.clear()
> }
> }
>
> abstract class AbstractExactlyOnceSink[T]
> extends RichSinkFunction[T]
> with CheckpointedFunction {
> protected var resultsState: ListState[String] = _
> protected var localResults: mutable.ArrayBuffer[String] = _
> protected val idx: Int = StreamTestSink.getNewSinkId
>
> protected var globalResults: mutable.Map[Int,
> mutable.ArrayBuffer[String]] = _
> protected var globalRetractResults
> : mutable.Map[Int, mutable.ArrayBuffer[String]] = _
> protected var globalUpsertResults
> : mutable.Map[Int, mutable.Map[String, String]] = _
>
> def isInitialized: Boolean = globalResults != null
>
> override def initializeState(context: FunctionInitializationContext):
> Unit = {
> resultsState = context.getOperatorStateStore
> .getListState(
> new ListStateDescriptor[String]("sink-results", Types.STRING)
> )
>
> localResults = mutable.ArrayBuffer.empty[String]
>
> if (context.isRestored) {
> for (value <- resultsState.get().asScala) {
> localResults += value
> }
> }
>
> val taskId = getRuntimeContext.getIndexOfThisSubtask
> StreamTestSink.synchronized(
> StreamTestSink.globalResults(idx) += (taskId -> localResults)
> )
> }
>
> override def snapshotState(context: FunctionSnapshotContext): Unit = {
> resultsState.clear()
> for (value <- localResults) {
> resultsState.add(value)
> }
> }
>
> protected def clearAndStashGlobalResults(): Unit = {
> if (globalResults == null) {
> StreamTestSink.synchronized {
> globalResults = StreamTestSink.globalResults.remove(idx).get
> globalRetractResults =
> StreamTestSink.globalRetractResults.remove(idx).get
> globalUpsertResults = StreamTestSink.globalUpsertResults.remove(idx).get
> }
> }
> }
>
> protected def getResults: List[String] = {
> clearAndStashGlobalResults()
> val result = mutable.ArrayBuffer.empty[String]
> this.globalResults.foreach {
> case (_, list) => result ++= list
> }
> result.toList
> }
> }
>
> final class StringSink[T] extends AbstractExactlyOnceSink[T]() {
> override def invoke(value: T) {
> localResults += value.toString
> }
>
> override def getResults: List[String] = super.getResults
> }
>
>
>
> On Mon, Nov 2, 2020 at 5:23 AM Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> @Timo: Is this sth that would work when using the new type stack? From
>> the message I'm assuming it's using the older type stack.
>>
>> @Rex: Which Flink version are you using and could you maybe post the
>> code snipped that you use to do conversions?
>>
>> Best,
>> Aljoscha
>>
>> On 02.11.20 06:50, Rex Fenley wrote:
>> > Maybe this is related to this issue?
>> > https://issues.apache.org/jira/browse/FLINK-17683
>> >
>> > On Fri, Oct 30, 2020 at 8:43 PM Rex Fenley <r...@remind101.com> wrote:
>> >
>> >> Correction, I'm using Scala case classes not strictly Java POJOs just
>> to
>> >> be clear.
>> >>
>> >> On Fri, Oct 30, 2020 at 7:56 PM Rex Fenley <r...@remind101.com> wrote:
>> >>
>> >>> Hello,
>> >>>
>> >>> I keep running into trouble moving between DataStream and SQL with
>> POJOs
>> >>> because my nested POJOs turn into LEGACY('STRUCTURED_TYPE', is there
>> any
>> >>> way to convert them back to POJOs in Flink when converting a SQL
>> Table back
>> >>> to a DataStream?
>> >>>
>> >>> Thanks!
>> >>>
>> >>> --
>> >>>
>> >>> Rex Fenley  |  Software Engineer - Mobile and Backend
>> >>>
>> >>>
>> >>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/
>> >
>> >>>   |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> >>> <https://www.facebook.com/remindhq>
>> >>>
>> >>
>> >>
>> >> --
>> >>
>> >> Rex Fenley  |  Software Engineer - Mobile and Backend
>> >>
>> >>
>> >> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>> |
>> >>   FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> >> <https://www.facebook.com/remindhq>
>> >>
>> >
>> >
>>
>>
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Reply via email to