Thanks, but how would I do this in the Table API? Map? I tried a map
function and it still wasn't able to infer the primary key unless I
artificially instead did an aggregate function over the pk.

On Wed, Dec 9, 2020 at 7:55 PM Jark Wu <[email protected]> wrote:

> Could you use 4 scalar functions instead of UDTF and map function? For
> example;
>
> select *, hasOrange(fruits), hasBanana(fruits), hasApple(fruits),
> hasWatermelon(fruits)
> from T;
>
> I think this can preserve the primary key.
>
> Best,
> Jark
>
> On Thu, 3 Dec 2020 at 15:28, Rex Fenley <[email protected]> wrote:
>
>> It appears that even when I pass id through the map function and join
>> back with the original table, it does not seem to think that the id passed
>> through map is a unique key. Is there any way to solve this while still
>> preserving the primary key?
>>
>> On Wed, Dec 2, 2020 at 5:27 PM Rex Fenley <[email protected]> wrote:
>>
>>> Even odder, if I pull the constructor of the function into its own
>>> variable it "works" (though it appears that map only passes through the
>>> fields mapped over which means I'll need an additional join, though now I
>>> think I'm on the right path).
>>>
>>> I.e.
>>> def splatFruits(table: Table, columnPrefix: String): Table = {
>>>   val func = new SplatFruitsFunc()
>>>   return table
>>>     .map(func($"fruits"))
>>>     .as(
>>>       s"${columnPrefix}_has_orange",
>>>       s"${columnPrefix}_has_banana",
>>>       s"${columnPrefix}_has_apple",
>>>       s"${columnPrefix}_has_watermelon"
>>>    )
>>>    .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
>>> }
>>>
>>> ends up giving me the following error instead
>>> > org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error: Cannot resolve field [fruits], input field
>>> list:[prefix_has_orange, prefix_has_banana, prefix_has_apple,
>>> prefix_has_watermelon].
>>>
>>> which implies I'll need to join back to the original table like I was
>>> doing with the leftOuterJoinLateral originally I suppose.
>>>
>>>
>>> On Wed, Dec 2, 2020 at 5:15 PM Rex Fenley <[email protected]> wrote:
>>>
>>>> Looks like `as` needed to move outside of where it was before to fix
>>>> that error. Though now I'm receiving
>>>> >org.apache.flink.client.program.ProgramInvocationException: The main
>>>> method caused an error: Aliasing more fields than we actually have.
>>>>
>>>> Example code now:
>>>>
>>>> // table will always have pk id
>>>> def splatFruits(table: Table, columnPrefix: String): Table = {
>>>> return table
>>>>  .map(
>>>>    new SplatFruitsFunc()(
>>>>      $"fruits"
>>>>    )
>>>>  )
>>>>  .as(
>>>>   s"${columnPrefix}_has_orange",
>>>>   s"${columnPrefix}_has_banana",
>>>>   s"${columnPrefix}_has_apple",
>>>>   s"${columnPrefix}_has_watermelon"
>>>>  )
>>>>  .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
>>>> }
>>>>
>>>> class SplatFruitsFunc extends ScalarFunction {
>>>>   def eval(fruits: Array[String]): Row = {
>>>>     val hasOrange: java.lang.Boolean = fruits.contains("Orange")
>>>>     val hasBanana: java.lang.Boolean = fruits.contains("Banana")
>>>>     val hasApple: java.lang.Boolean = fruits.contains("Apple")
>>>>     val hasWatermelon: java.lang.Boolean = fruits.contains("Watermelon")
>>>>     Row.of(hasOrange, hasBanana, hasApple, hasWatermelon)
>>>>   }
>>>>
>>>>   override def getResultType(signature: Array[Class[_]]):
>>>> TypeInformation[_] =
>>>>     Types.ROW(Types.BOOLEAN, Types.BOOLEAN, Types.BOOLEAN,
>>>> Types.BOOLEAN)
>>>> }
>>>>
>>>> which afaict correctly follows the documentation.
>>>>
>>>> Anything here stand out?
>>>>
>>>> On Wed, Dec 2, 2020 at 4:55 PM Rex Fenley <[email protected]> wrote:
>>>>
>>>>> So I just instead tried changing SplatFruitsFunc to a ScalaFunction
>>>>> and leftOuterJoinLateral to a map and I'm receiving:
>>>>> > org.apache.flink.client.program.ProgramInvocationException: The main
>>>>> method caused an error: Only a scalar function can be used in the map
>>>>> operator.
>>>>> which seems odd because documentation says
>>>>>
>>>>> > Performs a map operation with a user-defined scalar function or
>>>>> built-in scalar function. The output will be flattened if the output type
>>>>> is a composite type.
>>>>>
>>>>>
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/tableApi.html#row-based-operations
>>>>>
>>>>> Shouldn't this work as an alternative?
>>>>>
>>>>> On Wed, Dec 2, 2020 at 3:58 PM Rex Fenley <[email protected]> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I have a TableFunction and wherever it is applied with a
>>>>>> leftOuterJoinLateral, my table loses any inference of there being a 
>>>>>> primary
>>>>>> key. I see this because all subsequent joins end up with "NoUniqueKey" 
>>>>>> when
>>>>>> I know a primary key of id should exist.
>>>>>>
>>>>>> I'm wondering if this is expected behavior and if it's possible to
>>>>>> tell a table directly what the primary key should be?
>>>>>>
>>>>>>
>>>>>> To demonstrate my example:
>>>>>> My table function checks if an element of a certain type is in a
>>>>>> string array, and depending on whether or not it is there, it appends a
>>>>>> column with value true or false. For example, if array "fruits" which 
>>>>>> could
>>>>>> possibly contain orange, banana, apple, and watermelon on a row contains
>>>>>> only `["orange", "apple"]` then it will append `has_orange: true,
>>>>>> has_banana: false, has_apple: true, has_watermelon: false` as columns to
>>>>>> the row. This example is essentially the same as my code, outside of 
>>>>>> having
>>>>>> a much larger set of keys and not dealing with fruits.
>>>>>>
>>>>>> Example code:
>>>>>>
>>>>>> // table will always have pk id
>>>>>> def splatFruits(table: Table, columnPrefix: String): Table = {
>>>>>> return table
>>>>>>  .leftOuterJoinLateral(
>>>>>>    new SplatFruitsFunc()(
>>>>>>      $"fruits"
>>>>>>    ) as (s"${columnPrefix}_has_orange",
>>>>>> s"${columnPrefix}_has_banana", s"${columnPrefix}_has_apple",
>>>>>> s"${columnPrefix}_has_watermelon")
>>>>>>  )
>>>>>>  .renameColumns($"fruits".as(s"${columnPrefix}_fruits"))
>>>>>> }
>>>>>>
>>>>>> @FunctionHint(
>>>>>>   output = new DataTypeHint(
>>>>>>     "(has_orange BOOLEAN, has_banana BOOLEAN, has_apple BOOLEAN,
>>>>>> has_watermelon BOOLEAN)"
>>>>>>   )
>>>>>> )
>>>>>> class SplatFruitsFunc
>>>>>>     extends TableFunction[(Boolean, Boolean, Boolean, Boolean)] {
>>>>>>
>>>>>>   def eval(fruits: Array[String]): Unit = {
>>>>>>     val hasOrange: java.lang.Boolean = fruits.contains("orange")
>>>>>>     val hasBanana: java.lang.Boolean = fruits.contains("banana")
>>>>>>     val hasApple: java.lang.Boolean = fruits.contains("apple")
>>>>>>     val hasWatermelon: java.lang.Boolean =
>>>>>> fruits.contains("watermelon")
>>>>>>     collect(hasOrange, hasBanana, hasApple, hasWatermelon)
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>>
>>>>>>
>>>>>> Remind.com <https://www.remind.com/> |  BLOG
>>>>>> <http://blog.remind.com/>  |  FOLLOW US
>>>>>> <https://twitter.com/remindhq>  |  LIKE US
>>>>>> <https://www.facebook.com/remindhq>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>>
>>>>>
>>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>>> <https://www.facebook.com/remindhq>
>>>>>
>>>>
>>>>
>>>> --
>>>>
>>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>>
>>>>
>>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>>> <https://www.facebook.com/remindhq>
>>>>
>>>
>>>
>>> --
>>>
>>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>>
>>>
>>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>>> <https://www.facebook.com/remindhq>
>>>
>>
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
>>  |  FOLLOW US <https://twitter.com/remindhq>  |  LIKE US
>> <https://www.facebook.com/remindhq>
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com <https://www.remind.com/> |  BLOG <http://blog.remind.com/>
 |  FOLLOW
US <https://twitter.com/remindhq>  |  LIKE US
<https://www.facebook.com/remindhq>

Reply via email to