Hi, You have to add the implicit value in the main() method before you call .map(rowFn) and not in the MapFunction.
Best, Fabian 2017-07-10 18:54 GMT+02:00 Joshua Griffith <jgriff...@campuslabs.com>: > Hello Fabian, > > Thank you for your response. I tried your recommendation but I’m getting > the same issue. Here’s the altered MakeRow MapFunction I tried: > > class MakeRow extends MapFunction[(Integer, Integer), Row] { > implicit val rowType: TypeInformation[Row] = new RowTypeInfo( > Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO), > Array("id", "value") > ) > override def map(tuple: (Integer, Integer)): Row = tuple match { > case (value, id) => Row.of(id, value) > } > } > > > In stepping through the code execution, it looks like the problem is that > Row.isKeyType() > returns false > <https://github.com/apache/flink/blob/release-1.3.1-rc2/flink-core/src/main/java/org/apache/flink/api/common/operators/Keys.java#L98-L100>. > Any recommendations? > > Thanks, > > Joshua > > > On Jul 10, 2017, at 11:42 AM, Fabian Hueske <fhue...@gmail.com> wrote: > > Hi Joshua, > > thanks for reporting this issue. You code is fine but IMO there is a bug > in the Scala DataSet API. > It simply does not respect the type information provided by the > ResultTypeQueryable[Row] interface and defaults to a GenericType. > > I think this should be fix. I'll open a JIRA issue for that. > > You can explicitly declare types with implicits if you put the following > lines above the lines in which you apply the rowFn on the DataSet. > > implicit val rowTpe: TypeInformation[Row] = new RowTypeInfo( > Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, > BasicTypeInfo.INT_TYPE_INFO), > Array("id", "value") > ) > > When you do this, you can also remove move the ResultTypeQueryable > interface from the MapFunction. > > Cheers, Fabian > > > > 2017-07-10 18:10 GMT+02:00 Joshua Griffith <jgriff...@campuslabs.com>: > >> Thank you for your response Nico. Below is a simple case where I’m trying >> to join on Row fields: >> >> package com.github.hadronzoo.rowerror >> >> import org.apache.flink.api.common.functions.MapFunction >> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, >> TypeInformation} >> import org.apache.flink.api.java.typeutils.{ResultTypeQuerya >> ble, RowTypeInfo} >> import org.apache.flink.api.scala._ >> import org.apache.flink.types.Row >> >> object Main { >> >> class MakeRow extends MapFunction[(Integer, Integer), Row] with >> ResultTypeQueryable[Row] >> { >> override def map(tuple: (Integer, Integer)): Row = tuple match { >> case (value, id) => Row.of(id, value) >> } >> >> override def getProducedType: TypeInformation[Row] = >> new RowTypeInfo( >> Array[TypeInformation[_]](BasicTypeInfo.INT_TYPE_INFO, Bas >> icTypeInfo.INT_TYPE_INFO), >> Array("id", "value") >> ) >> } >> >> def integerTuple(intTuple: (Int, Int)): (Integer, Integer) = in >> tTuple match { case (a, b) => (a, b) } >> >> def main(args: Array[String]): Unit = { >> val env = ExecutionEnvironment.createLocalEnvironment() >> val rowFn = new MakeRow >> >> val ints = 0 until 1000 >> val evenIntegers = (ints filter (_ % 2 == 0)).zipWithIndex.map(in >> tegerTuple) >> val oddIntegers = (ints filter (_ % 2 == 1)).zipWithIndex.map(in >> tegerTuple) >> >> val evenRows = env.fromCollection(evenIntegers).map(rowFn) >> val oddRows = env.fromCollection(oddIntegers).map(rowFn) >> >> evenRows.join(oddRows).where("id").equalTo("id").print() >> } >> } >> >> >> Executing the above yields the following error: >> >> Exception in thread "main" >> org.apache.flink.api.common.InvalidProgramException: >> This type (GenericType<org.apache.flink.types.Row>) cannot be used as >> key. >> at org.apache.flink.api.common.operators.Keys$ExpressionKeys.< >> init>(Keys.java:330) >> at org.apache.flink.api.scala.UnfinishedKeyPairOperation.where( >> unfinishedKeyPairOperation.scala:72) >> at com.github.hadronzoo.rowerror.Main$.main(Main.scala:36) >> at com.github.hadronzoo.rowerror.Main.main(Main.scala) >> >> >> For my application I only have TypeInformation at runtime (before the >> execution graph is built). Is it possible to use Row fields in join >> operations or is there an error with my implementation? >> >> Thanks for your help, >> >> Joshua >> >> On Jul 10, 2017, at 9:09 AM, Nico Kruber <n...@data-artisans.com> wrote: >> >> Can you show a minimal example of the query you are trying to run? >> Maybe Timo or Fabian (cc'd) can help. >> >> >> Nico >> >> On Friday, 7 July 2017 23:09:09 CEST Joshua Griffith wrote: >> >> Hello, >> >> When using nested field expressions like “Account.Id" with nested rows, I >> get the following error, “This type >> (GenericType<org.apache.flink.types.Row>) cannot be used as key.” Is >> there >> a way to make nested field expressions work with nested rows? >> >> >> Thanks, >> >> Joshua >> >> >> >> > >