[ 
https://issues.apache.org/jira/browse/FLINK-10116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske reassigned FLINK-10116:
-------------------------------------

    Assignee: Fabian Hueske

> createComparator fails on case class with Unit type fields prior to the 
> join-key
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-10116
>                 URL: https://issues.apache.org/jira/browse/FLINK-10116
>             Project: Flink
>          Issue Type: Bug
>          Components: DataSet API
>    Affects Versions: 1.3.3, 1.6.0
>            Reporter: Will
>            Assignee: Fabian Hueske
>            Priority: Major
>         Attachments: JobFail.scala, JobPass.scala
>
>
> h1. Overview
> When joining between case classes, if the attribute representing the join-key 
> comes after Unit definition of fields (that are not being used) the join will 
> fail with the error
> {quote}{{Exception in thread "main" java.lang.IllegalArgumentException: Could 
> not add a comparator for the logicalkey field index 0.}}
>  \{{ at 
> org.apache.flink.api.common.typeutils.CompositeType.createComparator(CompositeType.java:162)}}
>  \{{ at 
> org.apache.flink.optimizer.postpass.JavaApiPostPass.createComparator(JavaApiPostPass.java:293)}}
>  \{{ at 
> org.apache.flink.optimizer.postpass.JavaApiPostPass.traverse(JavaApiPostPass.java:193)}}
> {quote}
> Using TypeInformation keys does not exhibit the same issue. Initial debugging 
> suggests that when calculating the index of the key for strings, Flink 
> doesn't count Unit elements, however they are included during iteration in 
> CompositeType.createComparator which leads to the search failing on the key 
> appearing to be a Unit type.
> h1. Code Examples to Reproduce
> [^JobFail.scala]
> [^JobPass.scala]
>  
> h1. ^Inline Code^
> h2. ^Fail^
> {code:java}
> package org.demo
> /**
>   * Licensed to the Apache Software Foundation (ASF) under one
>   * or more contributor license agreements.  See the NOTICE file
>   * distributed with this work for additional information
>   * regarding copyright ownership.  The ASF licenses this file
>   * to you under the Apache License, Version 2.0 (the
>   * "License"); you may not use this file except in compliance
>   * with the License.  You may obtain a copy of the License at
>   *
>   *     http://www.apache.org/licenses/LICENSE-2.0
>   *
>   * Unless required by applicable law or agreed to in writing, software
>   * distributed under the License is distributed on an "AS IS" BASIS,
>   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>   * See the License for the specific language governing permissions and
>   * limitations under the License.
>   */
> import org.apache.flink.api.common.functions.RichJoinFunction
> import org.apache.flink.api.java.DataSet
> import org.apache.flink.api.scala._
> object JobFail {
>   case class LeftJoin (
>                         FieldA: String = "",
>                         FieldB: String = "",
>                         FieldC: String = "",
>                         JoinIndex: String = ""
>                       )
>   case class RightJoin (
>                          FieldD: Unit = Unit,
>                          FieldE: Unit = Unit,
>                          FieldF: Unit = Unit,
>                          JoinIndex: String = ""
>                        )
>   case class Merged (
>                       var FieldA: String = "",
>                       var FieldB: String = "",
>                       var FieldC: String = "",
>                       var FieldD: String = "",
>                       var FieldE: String = "",
>                       var FieldF: String = "",
>                       var JoinIndex: String = ""
>                     )
>   class JoinHelper() extends RichJoinFunction[LeftJoin, RightJoin, Merged]{
>     override def join(first: LeftJoin, second: RightJoin): Merged = {
>       val out = new Merged()
>       out.FieldA = first.FieldA
>       out.FieldB = first.FieldB
>       out.FieldC = first.FieldC
>       if (second != null){
>         /*out.FieldD = second.FieldD
>         out.FieldE = second.FieldE
>         out.FieldF = second.FieldF*/
>       }
>       out
>     }
>   }
>   def main(args: Array[String]) {
>     // set up the execution environment
>     val env = ExecutionEnvironment.getExecutionEnvironment
>     val leftOne = new LeftJoin("FieldA1", "FieldB1", "FieldC1", "index")
>     val rightOne = new RightJoin("FieldD1", "FieldE1", "FieldF1", "index")
>     val left = env.fromElements(leftOne)
>     val right = env.fromElements(rightOne)
>     // TODO: String key fails
>     val joined = 
> left.leftOuterJoin(right).where("JoinIndex").equalTo("JoinIndex").apply(new 
> JoinHelper())
>     joined.print()
>   }
> }
> {code}
> h2. ^Pass^
> {code:java}
> package org.demo
> /**
>   * Licensed to the Apache Software Foundation (ASF) under one
>   * or more contributor license agreements.  See the NOTICE file
>   * distributed with this work for additional information
>   * regarding copyright ownership.  The ASF licenses this file
>   * to you under the Apache License, Version 2.0 (the
>   * "License"); you may not use this file except in compliance
>   * with the License.  You may obtain a copy of the License at
>   *
>   *     http://www.apache.org/licenses/LICENSE-2.0
>   *
>   * Unless required by applicable law or agreed to in writing, software
>   * distributed under the License is distributed on an "AS IS" BASIS,
>   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>   * See the License for the specific language governing permissions and
>   * limitations under the License.
>   */
> import org.apache.flink.api.common.functions.RichJoinFunction
> import org.apache.flink.api.java.DataSet
> import org.apache.flink.api.scala._
> object JobPass {
>   case class LeftJoin (
>                         FieldA: String = "",
>                         FieldB: String = "",
>                         FieldC: String = "",
>                         JoinIndex: String = ""
>                       )
>   case class RightJoin (
>                          FieldD: Unit = Unit,
>                          FieldE: Unit = Unit,
>                          FieldF: Unit = Unit,
>                          JoinIndex: String = ""
>                        )
>   case class Merged (
>                       var FieldA: String = "",
>                       var FieldB: String = "",
>                       var FieldC: String = "",
>                       var FieldD: String = "",
>                       var FieldE: String = "",
>                       var FieldF: String = "",
>                       var JoinIndex: String = ""
>                     )
>   class JoinHelper() extends RichJoinFunction[LeftJoin, RightJoin, Merged]{
>     override def join(first: LeftJoin, second: RightJoin): Merged = {
>       val out = new Merged()
>       out.FieldA = first.FieldA
>       out.FieldB = first.FieldB
>       out.FieldC = first.FieldC
>       if (second != null){
>         /*out.FieldD = second.FieldD
>         out.FieldE = second.FieldE
>         out.FieldF = second.FieldF*/
>       }
>       out
>     }
>   }
>   def main(args: Array[String]) {
>     // set up the execution environment
>     val env = ExecutionEnvironment.getExecutionEnvironment
>     val leftOne = new LeftJoin("FieldA1", "FieldB1", "FieldC1", "index")
>     val rightOne = new RightJoin("FieldD1", "FieldE1", "FieldF1", "index")
>     val left = env.fromElements(leftOne)
>     val right = env.fromElements(rightOne)
>     // TODO: TypeInformation key passes
>     val joined = 
> left.leftOuterJoin(right).where(_.JoinIndex).equalTo(_.JoinIndex).apply(new 
> JoinHelper())
>     joined.print()
>   }
> }
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to