Flink 1.11.2 with Scala 2.12

Error:
[info] JobScalaTest:
[info] - dummy *** FAILED ***
[info]   org.apache.flink.table.api.ValidationException: Field types of
query result and registered TableSink  do not match.
[info] Query schema: [user: BIGINT, product: ROW<`name`
VARCHAR(2147483647), `id` BIGINT>, amount: INT]
[info] Sink schema: [user: BIGINT, product: LEGACY('STRUCTURED_TYPE',
'ANY<ProductItem,
rO0ABXNyAB1Kb2JTY2FsYVRlc3QkJGFub24kOSQkYW5vbiQxMODHL_EXRquJAgAAeHIANm9yZy5hcGFjaGUuZmxpbmsuYXBpLnNjYWxhLnR5cGV1dGlscy5DYXNlQ2xhc3NUeXBlSW5mb46d1-iqONqQAgAMTAARUEFUVEVSTl9JTlRfRklFTER0ABlMamF2YS91dGlsL3JlZ2V4L1BhdHRlcm47TAAVUEFUVEVSTl9ORVNURURfRklFTERTcQB-AAJMAB5QQVRURVJOX05FU1RFRF9GSUVMRFNfV0lMRENBUkRxAH4AAkwAC1JFR0VYX0ZJRUxEdAASTGphdmEvbGFuZy9TdHJpbmc7TAAPUkVHRVhfSU5UX0ZJRUxEcQB-AANMABNSRUdFWF9ORVNURURfRklFTERTcQB-AANMABxSRUdFWF9ORVNURURfRklFTERTX1dJTERDQVJEcQB-AANMAA9SRUdFWF9TVFJfRklFTERxAH4AA0wABWNsYXp6dAARTGphdmEvbGFuZy9DbGFzcztMAApmaWVsZE5hbWVzdAAWTHNjYWxhL2NvbGxlY3Rpb24vU2VxO0wACmZpZWxkVHlwZXNxAH4ABVsAEnR5cGVQYXJhbVR5cGVJbmZvc3QAN1tMb3JnL2FwYWNoZS9mbGluay9hcGkvY29tbW9uL3R5cGVpbmZvL1R5cGVJbmZvcm1hdGlvbjt4cgA1b3JnLmFwYWNoZS5mbGluay5hcGkuamF2YS50eXBldXRpbHMuVHVwbGVUeXBlSW5mb0Jhc2UAAAAAAAAAAQIAAkkAC3RvdGFsRmllbGRzWwAFdHlwZXNxAH4ABnhyADNvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLkNvbXBvc2l0ZVR5cGUAAAAAAAAAAQIAAUwACXR5cGVDbGFzc3EAfgAEeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb26UjchIurN66wIAAHhwdnIAC1Byb2R1Y3RJdGVtBwFtETzcflcCAAJKAAJpZEwABG5hbWVxAH4AA3hwAAAAAnVyADdbTG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5UeXBlSW5mb3JtYXRpb247uKBspBqaFLYCAAB4cAAAAAJzcgAyb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkJhc2ljVHlwZUluZm_6BPCCpWndBgIABEwABWNsYXp6cQB-AARMAA9jb21wYXJhdG9yQ2xhc3NxAH4ABFsAF3Bvc3NpYmxlQ2FzdFRhcmdldFR5cGVzdAASW0xqYXZhL2xhbmcvQ2xhc3M7TAAKc2VyaWFsaXplcnQANkxvcmcvYXBhY2hlL2ZsaW5rL2FwaS9jb21tb24vdHlwZXV0aWxzL1R5cGVTZXJpYWxpemVyO3hxAH4ACXZyABBqYXZhLmxhbmcuU3RyaW5noPCkOHo7s0ICAAB4cHZyADtvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuU3RyaW5nQ29tcGFyYXRvcgAAAAAAAAABAgAAeHIAPm9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5CYXNpY1R5cGVDb21wYXJhdG9yAAAAAAAAAAECAAJaABNhc2NlbmRpbmdDb21wYXJpc29uWwALY29tcGFyYXRvcnN0ADdbTG9yZy9hcGFjaGUvZmxpbmsvYXBpL2NvbW1vbi90eXBldXRpbHMvVHlwZUNvbXBhcmF0b3I7eHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuVHlwZUNvbXBhcmF0b3IAAAAAAAAAAQIAAHhwdXIAEltMamF2YS5sYW5nLkNsYXNzO6sW167LzVqZAgAAeHAAAAAAc3IAO29yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBldXRpbHMuYmFzZS5TdHJpbmdTZXJpYWxpemVyAAAAAAAAAAECAAB4cgBCb3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLlR5cGVTZXJpYWxpemVyU2luZ2xldG9ueamHqscud0UCAAB4cgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5UeXBlU2VyaWFsaXplcgAAAAAAAAABAgAAeHBzcgA0b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGVpbmZvLkludGVnZXJUeXBlSW5mb5AFxEVpQEqVAgAAeHIANG9yZy5hcGFjaGUuZmxpbmsuYXBpLmNvbW1vbi50eXBlaW5mby5OdW1lcmljVHlwZUluZm-tmMZzMAKMFgIAAHhxAH4AD3ZyAA5qYXZhLmxhbmcuTG9uZzuL5JDMjyPfAgABSgAFdmFsdWV4cgAQamF2YS5sYW5nLk51bWJlcoaslR0LlOCLAgAAeHB2cgA5b3JnLmFwYWNoZS5mbGluay5hcGkuY29tbW9uLnR5cGV1dGlscy5iYXNlLkxvbmdDb21wYXJhdG9yAAAAAAAAAAECAAB4cQB-ABZ1cQB-ABoAAAADdnIAD2phdmEubGFuZy5GbG9hdNrtyaLbPPDsAgABRgAFdmFsdWV4cQB-ACR2cgAQamF2YS5sYW5nLkRvdWJsZYCzwkopa_sEAgABRAAFdmFsdWV4cQB-ACR2cgATamF2YS5sYW5nLkNoYXJhY3RlcjSLR9lrGiZ4AgABQwAFdmFsdWV4cHNyADlvcmcuYXBhY2hlLmZsaW5rLmFwaS5jb21tb24udHlwZXV0aWxzLmJhc2UuTG9uZ1NlcmlhbGl6ZXIAAAAAAAAAAQIAAHhxAH4AHXNyABdqYXZhLnV0aWwucmVnZXguUGF0dGVybkZn1WtuSQINAgACSQAFZmxhZ3NMAAdwYXR0ZXJucQB-AAN4cAAAAAB0AAZbMC05XStzcQB-ADEAAAAAdAAwKFtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XSspKFwuKC4rKSk_c3EAfgAxAAAAAHQANihbXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSp8WzAtOV0rKShcLiguKykpP3xcKnxcX3QAJVtccHtMfV9cJF1bXHB7TH1ccHtEaWdpdH1fXCRdKnxbMC05XStxAH4AM3EAfgA1cQB-ADd0AB5bXHB7TH1fXCRdW1xwe0x9XHB7RGlnaXR9X1wkXSpxAH4ADHNyADJzY2FsYS5jb2xsZWN0aW9uLmltbXV0YWJsZS5MaXN0JFNlcmlhbGl6YXRpb25Qcm94eQAAAAAAAAABAwAAeHB0AARuYW1ldAACaWRzcgAsc2NhbGEuY29sbGVjdGlvbi5pbW11dGFibGUuTGlzdFNlcmlhbGl6ZUVuZCSKXGNb91MLbQIAAHhweHNxAH4AOnEAfgAScQB-ACJxAH4AP3h1cQB-AA0AAAAA>'),
amount: INT]
[info]   at
org.apache.flink.table.planner.sinks.TableSinkUtils$.validateSchemaAndApplyImplicitCast(TableSinkUtils.scala:103)
[info]   at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:260)
[info]   at
org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:163)
[info]   at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
[info]   at scala.collection.Iterator.foreach(Iterator.scala:943)
[info]   at scala.collection.Iterator.foreach$(Iterator.scala:943)
[info]   at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
[info]   at scala.collection.IterableLike.foreach(IterableLike.scala:74)
[info]   at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
[info]   at scala.collection.AbstractIterable.foreach(Iterable.scala:56)

Code:
import com.remind.graph.people.PeopleJobScala

import org.scalatest.funsuite._
import org.scalatest.BeforeAndAfter

import org.apache.flink.streaming.api.scala.{
DataStream,
StreamExecutionEnvironment
}
import org.apache.flink.streaming.util.TestStreamEnvironment
import org.apache.flink.table.runtime.util._
import org.apache.flink.test.util.AbstractTestBase
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.api.common.state.ListState
import org.apache.flink.runtime.state.FunctionInitializationContext
import org.apache.flink.api.common.state.ListStateDescriptor
import org.apache.flink.runtime.state.FunctionSnapshotContext
import org.apache.flink.types.Row

import java.io.Serializable;
import java.sql.Timestamp;
import java.text.SimpleDateFormat
import java.util.concurrent.atomic.AtomicInteger
import java.{util => ju}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.Try

case class Order(user: Long, product: ProductItem, amount: Int) {
def this() {
this(0, null, 0)
}

override def toString(): String = {
return "Order{" +
"user=" + user +
", product='" + product + '\'' +
", amount=" + amount +
'}';
}
}

case class ProductItem(name: String, id: Long) {
def this() {
this(null, 0)
}

override def toString(): String = {
return "Product{" +
"name='" + name + '\'' +
", id=" + id +
'}';
}
}

class JobScalaTest extends AnyFunSuite with BeforeAndAfter {
var env: StreamExecutionEnvironment = _
var tEnv: StreamTableEnvironment = _

before {
this.env = StreamExecutionEnvironment.getExecutionEnvironment
this.env.setParallelism(2)
this.env.getConfig.enableObjectReuse()
val setting = EnvironmentSettings.newInstance().inStreamingMode().build()
this.tEnv = StreamTableEnvironment.create(env, setting)
}

after {
StreamTestSink.clear()
// TestValuesTableFactory.clearAllData()
}

def dateFrom(stringDate: String): java.sql.Date = {
val date = new SimpleDateFormat("dd/MM/yyyy")
.parse(stringDate)
return new java.sql.Date(date.getTime())
}

def printTable(table: Table) = {
println(table)
table.printSchema()
println(table.getSchema().getFieldNames().mkString(", "))
}

def printDataStream(dataStream: DataStream[_]) = {
println(dataStream)
println(dataStream.dataType)
}

test("dummy") {
val orderA: DataStream[Order] = this.env.fromCollection(
Seq(
new Order(1L, new ProductItem("beer", 10L), 3),
new Order(1L, new ProductItem("diaper", 11L), 4),
new Order(3L, new ProductItem("rubber", 12L), 2)
)
)

val orderB: DataStream[Order] = this.env.fromCollection(
Seq(
new Order(2L, new ProductItem("pen", 13L), 3),
new Order(2L, new ProductItem("rubber", 12L), 3),
new Order(4L, new ProductItem("beer", 10L), 1)
)
)

println(orderB)
println(orderB.dataType)

// convert DataStream to Table
val tableA =
this.tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
println(tableA)
tableA.printSchema()
println(tableA.getSchema().getFieldNames().mkString(", "))
// register DataStream as Table
this.tEnv.createTemporaryView("OrderB", orderB, 'user, 'product, 'amount)

// union the two tables
val result = this.tEnv.sqlQuery(s"""
|SELECT * FROM $tableA WHERE amount > 2
|UNION ALL
|SELECT * FROM OrderB WHERE amount < 2
""".stripMargin)

val sink = new StringSink[Order]()
result.toAppendStream[Order].addSink(sink)

this.env.execute()

val expected = List(
"Order{user=1, product='Product{name='beer', id=10}', amount=3}",
"Order{user=1, product='Product{name='diaper', id=11}', amount=4}",
"Order{user=4, product='Product{name='beer', id=10}', amount=1}"
)
val results = sink.getResults.sorted
println("results")
println(results)
assert(expected.sorted === results)
}
}

/**
* Taken from:
https://github.com/apache/flink/blob/release-1.11.2/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamTestSink.scala
* There's a whole bunch of other test sinks to choose from there.
*/
object StreamTestSink {

val idCounter: AtomicInteger = new AtomicInteger(0)

val globalResults =
mutable.HashMap.empty[Int, mutable.Map[Int, mutable.ArrayBuffer[String]]]
val globalRetractResults =
mutable.HashMap.empty[Int, mutable.Map[Int, mutable.ArrayBuffer[String]]]
val globalUpsertResults =
mutable.HashMap.empty[Int, mutable.Map[Int, mutable.Map[String, String]]]

def getNewSinkId: Int = {
val idx = idCounter.getAndIncrement()
this.synchronized {
globalResults.put(
idx,
mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
)
globalRetractResults.put(
idx,
mutable.HashMap.empty[Int, mutable.ArrayBuffer[String]]
)
globalUpsertResults.put(
idx,
mutable.HashMap.empty[Int, mutable.Map[String, String]]
)
}
idx
}

def clear(): Unit = {
globalResults.clear()
globalRetractResults.clear()
globalUpsertResults.clear()
}
}

abstract class AbstractExactlyOnceSink[T]
extends RichSinkFunction[T]
with CheckpointedFunction {
protected var resultsState: ListState[String] = _
protected var localResults: mutable.ArrayBuffer[String] = _
protected val idx: Int = StreamTestSink.getNewSinkId

protected var globalResults: mutable.Map[Int, mutable.ArrayBuffer[String]]
= _
protected var globalRetractResults
: mutable.Map[Int, mutable.ArrayBuffer[String]] = _
protected var globalUpsertResults
: mutable.Map[Int, mutable.Map[String, String]] = _

def isInitialized: Boolean = globalResults != null

override def initializeState(context: FunctionInitializationContext): Unit
= {
resultsState = context.getOperatorStateStore
.getListState(
new ListStateDescriptor[String]("sink-results", Types.STRING)
)

localResults = mutable.ArrayBuffer.empty[String]

if (context.isRestored) {
for (value <- resultsState.get().asScala) {
localResults += value
}
}

val taskId = getRuntimeContext.getIndexOfThisSubtask
StreamTestSink.synchronized(
StreamTestSink.globalResults(idx) += (taskId -> localResults)
)
}

override def snapshotState(context: FunctionSnapshotContext): Unit = {
resultsState.clear()
for (value <- localResults) {
resultsState.add(value)
}
}

protected def clearAndStashGlobalResults(): Unit = {
if (globalResults == null) {
StreamTestSink.synchronized {
globalResults = StreamTestSink.globalResults.remove(idx).get
globalRetractResults =
StreamTestSink.globalRetractResults.remove(idx).get
globalUpsertResults = StreamTestSink.globalUpsertResults.remove(idx).get
}
}
}

protected def getResults: List[String] = {
clearAndStashGlobalResults()
val result = mutable.ArrayBuffer.empty[String]
this.globalResults.foreach {
case (_, list) => result ++= list
}
result.toList
}
}

final class StringSink[T] extends AbstractExactlyOnceSink[T]() {
override def invoke(value: T) {
localResults += value.toString
}

override def getResults: List[String] = super.getResults
}



On Mon, Nov 2, 2020 at 5:23 AM Aljoscha Krettek <aljos...@apache.org> wrote:

> @Timo: Is this sth that would work when using the new type stack? From
> the message I'm assuming it's using the older type stack.
>
> @Rex: Which Flink version are you using and could you maybe post the
> code snipped that you use to do conversions?
>
> Best,
> Aljoscha
>
> On 02.11.20 06:50, Rex Fenley wrote:
> > Maybe this is related to this issue?
> > https://issues.apache.org/jira/browse/FLINK-17683
> >
> > On Fri, Oct 30, 2020 at 8:43 PM Rex Fenley <r...@remind101.com> wrote:
> >
> >> Correction, I'm using Scala case classes not strictly Java POJOs just to
> >> be clear.
> >>
> >> On Fri, Oct 30, 2020 at 7:56 PM Rex Fenley <r...@remind101.com> wrote:
> >>
> >>> Hello,
> >>>
> >>> I keep running into trouble moving between DataStream and SQL with
> POJOs
> >>> because my nested POJOs turn into LEGACY('STRUCTURED_TYPE', is there
> any
> >>> way to convert them back to POJOs in Flink when converting a SQL Table
> back
> >>> to a DataStream?
> >>>
> >>> Thanks!
> >>>
> >>> --
> >>>
> >>> Rex Fenley  |  Software Engineer - Mobile and Backend
> >>>
> >>>
> >>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
> >>>   |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> >>> <https://www.facebook.com/remindhq>
> >>>
> >>
> >>
> >> --
> >>
> >> Rex Fenley  |  Software Engineer - Mobile and Backend
> >>
> >>
> >> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
> |
> >>   FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> >> <https://www.facebook.com/remindhq>
> >>
> >
> >
>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Reply via email to