Indeed that worked. Thanks!

> On Jul 10, 2017, at 11:57 AM, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi,
> 
> You have to add the implicit value in the main() method before you call 
> .map(rowFn) and not in the MapFunction.
> 
> Best, Fabian
> 
> 
> 2017-07-10 18:54 GMT+02:00 Joshua Griffith <jgriff...@campuslabs.com 
> <mailto:jgriff...@campuslabs.com>>:
> Hello Fabian,
> 
> Thank you for your response. I tried your recommendation but I’m getting the 
> same issue. Here’s the altered MakeRow MapFunction I tried:
> 
>   class MakeRow extends MapFunction[(Integer, Integer), Row] {
>     implicit val rowType: TypeInformation[Row] = new RowTypeInfo(
>       Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, 
> BasicTypeInfo.INT_TYPE_INFO),
>       Array("id", "value")
>     )
>     override def map(tuple: (Integer, Integer)): Row = tuple match {
>       case (value, id) => Row.of(id, value)
>     }
>   }
> 
> In stepping through the code execution, it looks like the problem is that 
> Row.isKeyType() returns false 
> <https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Frelease-1.3.1-rc2%2Fflink-core%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fapi%2Fcommon%2Foperators%2FKeys.java%23L98-L100&data=02%7C01%7CJGriffith%40campuslabs.com%7C148b39863bd145e2c18408d4c7b4cb09%7C809fd6c8b87647a9abe28be2888f4a55%7C0%7C0%7C636353026690173035&sdata=lkbNW%2FpRfQEscUFgeu3ExtrmZh6N%2FOI18AfcJx3agII%3D&reserved=0>.
>  Any recommendations?
> 
> Thanks,
> 
> Joshua
> 
> 
>> On Jul 10, 2017, at 11:42 AM, Fabian Hueske <fhue...@gmail.com 
>> <mailto:fhue...@gmail.com>> wrote:
>> 
>> Hi Joshua,
>> 
>> thanks for reporting this issue. You code is fine but IMO there is a bug in 
>> the Scala DataSet API.
>> It simply does not respect the type information provided by the 
>> ResultTypeQueryable[Row] interface and defaults to a GenericType.
>> 
>> I think this should be fix. I'll open a JIRA issue for that.
>> 
>> You can explicitly declare types with implicits if you put the following 
>> lines above the lines in which you apply the rowFn on the DataSet.
>> 
>> implicit val rowTpe: TypeInformation[Row] = new RowTypeInfo(
>>   Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, 
>> BasicTypeInfo.INT_TYPE_INFO),
>>   Array("id", "value")
>> )
>> When you do this, you can also remove move the ResultTypeQueryable interface 
>> from the MapFunction.
>> 
>> Cheers, Fabian
>> 
>> 
>> 
>> 2017-07-10 18:10 GMT+02:00 Joshua Griffith <jgriff...@campuslabs.com 
>> <mailto:jgriff...@campuslabs.com>>:
>> Thank you for your response Nico. Below is a simple case where I’m trying to 
>> join on Row fields:
>> 
>> package com.github.hadronzoo.rowerror
>> 
>> import org.apache.flink.api.co 
>> <https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Forg.apache.flink.api.co&data=02%7C01%7CJGriffith%40campuslabs.com%7C148b39863bd145e2c18408d4c7b4cb09%7C809fd6c8b87647a9abe28be2888f4a55%7C0%7C0%7C636353026690173035&sdata=sXGi4RgjOnDbDNJFJs8193jiebxru%2FBOe7krZ0hDSzI%3D&reserved=0>mmon.functions.MapFunction
>> import org.apache.flink.api.co 
>> <https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Forg.apache.flink.api.co&data=02%7C01%7CJGriffith%40campuslabs.com%7C148b39863bd145e2c18408d4c7b4cb09%7C809fd6c8b87647a9abe28be2888f4a55%7C0%7C0%7C636353026690173035&sdata=sXGi4RgjOnDbDNJFJs8193jiebxru%2FBOe7krZ0hDSzI%3D&reserved=0>mmon.typeinfo.{BasicTypeInfo,
>>  TypeInformation}
>> import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo}
>> import org.apache.flink.api.sc 
>> <https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Forg.apache.flink.api.sc&data=02%7C01%7CJGriffith%40campuslabs.com%7C148b39863bd145e2c18408d4c7b4cb09%7C809fd6c8b87647a9abe28be2888f4a55%7C0%7C0%7C636353026690173035&sdata=SM6GjKvOd%2BfN9HzLdUaYe%2BJ2xLe%2FjqaqWZIMMAyR%2Bvg%3D&reserved=0>ala._
>> import org.apache.flink.types.Row
>> 
>> object Main {
>> 
>>   class MakeRow extends MapFunction[(Integer, Integer), Row] with 
>> ResultTypeQueryable[Row] {
>>     override def map(tuple: (Integer, Integer)): Row = tuple match {
>>       case (value, id) => Row.of(id, value)
>>     }
>> 
>>     override def getProducedType: TypeInformation[Row] =
>>       new RowTypeInfo(
>>         Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, 
>> BasicTypeInfo.INT_TYPE_INFO),
>>         Array("id", "value")
>>       )
>>   }
>> 
>>   def integerTuple(intTuple: (Int, Int)): (Integer, Integer) = intTuple 
>> match { case (a, b) => (a, b) }
>> 
>>   def main(args: Array[String]): Unit = {
>>     val env = ExecutionEnvironment.createLocalEnvironment()
>>     val rowFn = new MakeRow
>> 
>>     val ints = 0 until 1000
>>     val evenIntegers = (ints filter (_ % 2 == 
>> 0)).zipWithIndex.map(integerTuple)
>>     val oddIntegers = (ints filter (_ % 2 == 
>> 1)).zipWithIndex.map(integerTuple)
>> 
>>     val evenRows = env.fromCollection(evenIntegers).map(rowFn)
>>     val oddRows = env.fromCollection(oddIntegers).map(rowFn)
>> 
>>     evenRows.join(oddRows).where("id").equalTo("id").print()
>>   }
>> }
>> 
>> Executing the above yields the following error:
>> 
>> Exception in thread "main" org.apache.flink.api.common.In 
>> <https://na01.safelinks.protection.outlook.com/?url=http%3A%2F%2Forg.apache.flink.api.common.In&data=02%7C01%7CJGriffith%40campuslabs.com%7C148b39863bd145e2c18408d4c7b4cb09%7C809fd6c8b87647a9abe28be2888f4a55%7C0%7C0%7C636353026690173035&sdata=MQRpagG2i%2F4GCEGuW%2FkiXdzTuNasDtCPPEZRmjSTFhQ%3D&reserved=0>validProgramException:
>>  This type (GenericType<org.apache.flink.types.Row>) cannot be used as key.
>>      at 
>> org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330)
>>      at 
>> org.apache.flink.api.scala.UnfinishedKeyPairOperation.where(unfinishedKeyPairOperation.scala:72)
>>      at com.github.hadronzoo.rowerror.Main$.main(Main.scala:36)
>>      at com.github.hadronzoo.rowerror.Main.main(Main.scala)
>> 
>> For my application I only have TypeInformation at runtime (before the 
>> execution graph is built). Is it possible to use Row fields in join 
>> operations or is there an error with my implementation?
>> 
>> Thanks for your help,
>> 
>> Joshua
>> 
>>> On Jul 10, 2017, at 9:09 AM, Nico Kruber <n...@data-artisans.com 
>>> <mailto:n...@data-artisans.com>> wrote:
>>> 
>>> Can you show a minimal example of the query you are trying to run?
>>> Maybe Timo or Fabian (cc'd) can help.
>>> 
>>> 
>>> Nico
>>> 
>>> On Friday, 7 July 2017 23:09:09 CEST Joshua Griffith wrote:
>>>> Hello,
>>>> 
>>>> When using nested field expressions like “Account.Id" with nested rows, I
>>>> get the following error, “This type
>>>> (GenericType<org.apache.flink.types.Row>) cannot be used as key.” Is there
>>>> a way to make nested field expressions work with nested rows?
>>> 
>>>> Thanks,
>>>> 
>>>> Joshua
>>> 
>> 
>> 
> 
> 

Attachment: signature.asc
Description: Message signed with OpenPGP

Reply via email to