So I just instead tried changing SplatFruitsFunc to a ScalaFunction and leftOuterJoinLateral to a map and I'm receiving: > org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Only a scalar function can be used in the map operator. which seems odd because documentation says
> Performs a map operation with a user-defined scalar function or built-in scalar function. The output will be flattened if the output type is a composite type. https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations Shouldn't this work as an alternative? On Wed, Dec 2, 2020 at 3:58 PM Rex Fenley <[email protected]> wrote: > Hello, > > I have a TableFunction and wherever it is applied with a > leftOuterJoinLateral, my table loses any inference of there being a primary > key. I see this because all subsequent joins end up with "NoUniqueKey" when > I know a primary key of id should exist. > > I'm wondering if this is expected behavior and if it's possible to tell a > table directly what the primary key should be? > > > To demonstrate my example: > My table function checks if an element of a certain type is in a string > array, and depending on whether or not it is there, it appends a column > with value true or false. For example, if array "fruits" which could > possibly contain orange, banana, apple, and watermelon on a row contains > only `["orange", "apple"]` then it will append `has_orange: true, > has_banana: false, has_apple: true, has_watermelon: false` as columns to > the row. This example is essentially the same as my code, outside of having > a much larger set of keys and not dealing with fruits. > > Example code: > > // table will always have pk id > def splatFruits(table: Table, columnPrefix: String): Table = { > return table > .leftOuterJoinLateral( > new SplatFruitsFunc()( > $"fruits" > ) as (s"${columnPrefix}_has_orange", s"${columnPrefix}_has_banana", > s"${columnPrefix}_has_apple", s"${columnPrefix}_has_watermelon") > ) > .renameColumns($"fruits".as(s"${columnPrefix}_fruits")) > } > > @FunctionHint( > output = new DataTypeHint( > "(has_orange BOOLEAN, has_banana BOOLEAN, has_apple BOOLEAN, > has_watermelon BOOLEAN)" > ) > ) > class SplatFruitsFunc > extends TableFunction[(Boolean, Boolean, Boolean, Boolean)] { > > def eval(fruits: Array[String]): Unit = { > val hasOrange: java.lang.Boolean = fruits.contains("orange") > val hasBanana: java.lang.Boolean = fruits.contains("banana") > val hasApple: java.lang.Boolean = fruits.contains("apple") > val hasWatermelon: java.lang.Boolean = fruits.contains("watermelon") > collect(hasOrange, hasBanana, hasApple, hasWatermelon) > } > } > > Thanks! > > -- > > Rex Fenley | Software Engineer - Mobile and Backend > > > Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | > FOLLOW US <https://twitter.com/remindhq> | LIKE US > <https://www.facebook.com/remindhq> > -- Rex Fenley | Software Engineer - Mobile and Backend Remind.com <https://www.remind.com/> | BLOG <http://blog.remind.com/> | FOLLOW US <https://twitter.com/remindhq> | LIKE US <https://www.facebook.com/remindhq>
