So I just instead tried changing SplatFruitsFunc to a ScalaFunction and
leftOuterJoinLateral to a map and I'm receiving:
> org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error: Only a scalar function can be used in the map
operator.
which seems odd because documentation says

> Performs a map operation with a user-defined scalar function or built-in
scalar function. The output will be flattened if the output type is a
composite type.

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations

Shouldn't this work as an alternative?

On Wed, Dec 2, 2020 at 3:58 PM Rex Fenley <[email protected]> wrote:

> Hello,
>
> I have a TableFunction and wherever it is applied with a
> leftOuterJoinLateral, my table loses any inference of there being a primary
> key. I see this because all subsequent joins end up with "NoUniqueKey" when
> I know a primary key of id should exist.
>
> I'm wondering if this is expected behavior and if it's possible to tell a
> table directly what the primary key should be?
>
>
> To demonstrate my example:
> My table function checks if an element of a certain type is in a string
> array, and depending on whether or not it is there, it appends a column
> with value true or false. For example, if array "fruits" which could
> possibly contain orange, banana, apple, and watermelon on a row contains
> only `["orange", "apple"]` then it will append `has_orange: true,
> has_banana: false, has_apple: true, has_watermelon: false` as columns to
> the row. This example is essentially the same as my code, outside of having
> a much larger set of keys and not dealing with fruits.
>
> Example code:
>
> // table will always have pk id
> def splatFruits(table: Table, columnPrefix: String): Table = {
> return table
>  .leftOuterJoinLateral(
>    new SplatFruitsFunc()(
>      $"fruits"
>    ) as (s"${columnPrefix}_has_orange", s"${columnPrefix}_has_banana",
> s"${columnPrefix}_has_apple", s"${columnPrefix}_has_watermelon")
>  )
>  .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
> }
>
> @FunctionHint(
>   output = new DataTypeHint(
>     "(has_orange BOOLEAN, has_banana BOOLEAN, has_apple BOOLEAN,
> has_watermelon BOOLEAN)"
>   )
> )
> class SplatFruitsFunc
>     extends TableFunction[(Boolean, Boolean, Boolean, Boolean)] {
>
>   def eval(fruits: Array[String]): Unit = {
>     val hasOrange: java.lang.Boolean = fruits.contains("orange")
>     val hasBanana: java.lang.Boolean = fruits.contains("banana")
>     val hasApple: java.lang.Boolean = fruits.contains("apple")
>     val hasWatermelon: java.lang.Boolean = fruits.contains("watermelon")
>     collect(hasOrange, hasBanana, hasApple, hasWatermelon)
>   }
> }
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>  |
>  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
> <https://www.facebook.com/remindhq>
>


-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Reply via email to