Github user wangxiaojing commented on a diff in the pull request: https://github.com/apache/spark/pull/2953#discussion_r19527181 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala --- @@ -845,6 +858,198 @@ private[hive] object HiveQl { throw new NotImplementedError(s"No parse rules for:\n ${dumpTree(a).toString} ") } + // store the window def of current sql + //use thread id as key to avoid mistake when muti sqls parse at the same time + protected val windowDefMap = new ConcurrentHashMap[Long,Map[String, Seq[ASTNode]]]() + + // store the window spec of current sql + //use thread id as key to avoid mistake when muti sqls parse at the same time + protected val windowPartitionsMap = new ConcurrentHashMap[Long, ArrayBuffer[Node]]() + + protected def initWindow() = { + windowDefMap.put(Thread.currentThread().getId, Map[String, Seq[ASTNode]]()) + windowPartitionsMap.put(Thread.currentThread().getId, new ArrayBuffer[Node]()) + } + protected def checkWindowDef(windowClause: Option[Node]) = { + + var winDefs = windowDefMap.get(Thread.currentThread().getId) + + windowClause match { + case Some(window) => window.getChildren.foreach { + case Token("TOK_WINDOWDEF", Token(alias, Nil) :: Token("TOK_WINDOWSPEC", ws) :: Nil) => { + winDefs += alias -> ws + } + } + case None => //do nothing + } + + windowDefMap.put(Thread.currentThread().getId, winDefs) + } + + protected def translateWindowSpec(windowSpec: Seq[ASTNode]): Seq[ASTNode]= { + + windowSpec match { + case Token(alias, Nil) :: Nil => translateWindowSpec(getWindowSpec(alias)) + case Token(alias, Nil) :: range => { + val (partitionClause :: rowsRange :: valueRange :: Nil) = getClauses( + Seq( + "TOK_PARTITIONINGSPEC", + "TOK_WINDOWRANGE", + "TOK_WINDOWVALUES"), + translateWindowSpec(getWindowSpec(alias))) + partitionClause match { + case Some(partition) => partition.asInstanceOf[ASTNode] :: range + case None => range + } + } + case e => e + } + } + + protected def getWindowSpec(alias: String): Seq[ASTNode]= { + windowDefMap.get(Thread.currentThread().getId).getOrElse( + alias, sys.error("no window def for " + alias)) + } + + protected def addWindowPartitions(partition: Node) = { + + var winPartitions = windowPartitionsMap.get(Thread.currentThread().getId) + winPartitions += partition + windowPartitionsMap.put(Thread.currentThread().getId, winPartitions) + } + + protected def getWindowPartitions(): Seq[Node]= { + windowPartitionsMap.get(Thread.currentThread().getId).toSeq + } + + protected def checkWindowPartitions(): Option[Seq[ASTNode]] = { + + val partitionUnits = new ArrayBuffer[Seq[ASTNode]]() + + getWindowPartitions.map { + case Token("TOK_PARTITIONINGSPEC", partition) => Some(partition) + case _ => None + }.foreach { + case Some(partition) => { + if (partitionUnits.isEmpty) partitionUnits += partition + else { + //only add different window partitions --- End diff -- Space after //
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org