Hi,

You have to add the implicit value in the main() method before you call
.map(rowFn) and not in the MapFunction.

Best, Fabian


2017-07-10 18:54 GMT+02:00 Joshua Griffith <jgriff...@campuslabs.com>:

> Hello Fabian,
>
> Thank you for your response. I tried your recommendation but I’m getting
> the same issue. Here’s the altered MakeRow MapFunction I tried:
>
>   class MakeRow extends MapFunction[(Integer, Integer), Row] {
>     implicit val rowType: TypeInformation[Row] = new RowTypeInfo(
>       Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO,
> BasicTypeInfo.INT_TYPE_INFO),
>       Array("id", "value")
>     )
>     override def map(tuple: (Integer, Integer)): Row = tuple match {
>       case (value, id) => Row.of(id, value)
>     }
>   }
>
>
> In stepping through the code execution, it looks like the problem is that 
> Row.isKeyType()
> returns false
> <https://github.com/apache/flink/blob/release-1.3.1-rc2/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java#L98-L100>.
> Any recommendations?
>
> Thanks,
>
> Joshua
>
>
> On Jul 10, 2017, at 11:42 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
> Hi Joshua,
>
> thanks for reporting this issue. You code is fine but IMO there is a bug
> in the Scala DataSet API.
> It simply does not respect the type information provided by the
> ResultTypeQueryable[Row] interface and defaults to a GenericType.
>
> I think this should be fix. I'll open a JIRA issue for that.
>
> You can explicitly declare types with implicits if you put the following
> lines above the lines in which you apply the rowFn on the DataSet.
>
> implicit val rowTpe: TypeInformation[Row] = new RowTypeInfo(
>   Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, 
> BasicTypeInfo.INT_TYPE_INFO),
>   Array("id", "value")
> )
>
> When you do this, you can also remove move the ResultTypeQueryable
> interface from the MapFunction.
>
> Cheers, Fabian
>
>
>
> 2017-07-10 18:10 GMT+02:00 Joshua Griffith <jgriff...@campuslabs.com>:
>
>> Thank you for your response Nico. Below is a simple case where I’m trying
>> to join on Row fields:
>>
>> package com.github.hadronzoo.rowerror
>>
>> import org.apache.flink.api.common.functions.MapFunction
>> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
>> TypeInformation}
>> import org.apache.flink.api.java.typeutils.{ResultTypeQuerya
>> ble, RowTypeInfo}
>> import org.apache.flink.api.scala._
>> import org.apache.flink.types.Row
>>
>> object Main {
>>
>>   class MakeRow extends MapFunction[(Integer, Integer), Row] with 
>> ResultTypeQueryable[Row]
>> {
>>     override def map(tuple: (Integer, Integer)): Row = tuple match {
>>       case (value, id) => Row.of(id, value)
>>     }
>>
>>     override def getProducedType: TypeInformation[Row] =
>>       new RowTypeInfo(
>>         Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, Bas
>> icTypeInfo.INT_TYPE_INFO),
>>         Array("id", "value")
>>       )
>>   }
>>
>>   def integerTuple(intTuple: (Int, Int)): (Integer, Integer) = in
>> tTuple match { case (a, b) => (a, b) }
>>
>>   def main(args: Array[String]): Unit = {
>>     val env = ExecutionEnvironment.createLocalEnvironment()
>>     val rowFn = new MakeRow
>>
>>     val ints = 0 until 1000
>>     val evenIntegers = (ints filter (_ % 2 == 0)).zipWithIndex.map(in
>> tegerTuple)
>>     val oddIntegers = (ints filter (_ % 2 == 1)).zipWithIndex.map(in
>> tegerTuple)
>>
>>     val evenRows = env.fromCollection(evenIntegers).map(rowFn)
>>     val oddRows = env.fromCollection(oddIntegers).map(rowFn)
>>
>>     evenRows.join(oddRows).where("id").equalTo("id").print()
>>   }
>> }
>>
>>
>> Executing the above yields the following error:
>>
>> Exception in thread "main" 
>> org.apache.flink.api.common.InvalidProgramException:
>> This type (GenericType<org.apache.flink.types.Row>) cannot be used as
>> key.
>> at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<
>> init>(Keys.java:330)
>> at org.apache.flink.api.scala.UnfinishedKeyPairOperation.where(
>> unfinishedKeyPairOperation.scala:72)
>> at com.github.hadronzoo.rowerror.Main$.main(Main.scala:36)
>> at com.github.hadronzoo.rowerror.Main.main(Main.scala)
>>
>>
>> For my application I only have TypeInformation at runtime (before the
>> execution graph is built). Is it possible to use Row fields in join
>> operations or is there an error with my implementation?
>>
>> Thanks for your help,
>>
>> Joshua
>>
>> On Jul 10, 2017, at 9:09 AM, Nico Kruber <n...@data-artisans.com> wrote:
>>
>> Can you show a minimal example of the query you are trying to run?
>> Maybe Timo or Fabian (cc'd) can help.
>>
>>
>> Nico
>>
>> On Friday, 7 July 2017 23:09:09 CEST Joshua Griffith wrote:
>>
>> Hello,
>>
>> When using nested field expressions like “Account.Id" with nested rows, I
>> get the following error, “This type
>> (GenericType<org.apache.flink.types.Row>) cannot be used as key.” Is
>> there
>> a way to make nested field expressions work with nested rows?
>>
>>
>> Thanks,
>>
>> Joshua
>>
>>
>>
>>
>
>

Reply via email to