Merge branch 'master' into S2GRAPH-131
Project: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/commit/a08df114 Tree: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/tree/a08df114 Diff: http://git-wip-us.apache.org/repos/asf/incubator-s2graph/diff/a08df114 Branch: refs/heads/master Commit: a08df114e81fcf262b34a3c2125481e6d8587493 Parents: 42e4fa1 f74c224 Author: DO YUNG YOON <steams...@apache.org> Authored: Wed Nov 30 21:10:48 2016 +0900 Committer: DO YUNG YOON <steams...@apache.org> Committed: Wed Nov 30 21:10:48 2016 +0900 ---------------------------------------------------------------------- CHANGES | 4 ++++ s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-s2graph/blob/a08df114/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala ---------------------------------------------------------------------- diff --cc s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala index cd1e7f0,0000000..5a8408d mode 100644,000000..100644 --- a/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala +++ b/s2core/src/main/scala/org/apache/s2graph/core/S2Edge.scala @@@ -1,969 -1,0 +1,969 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.s2graph.core + +import java.util +import java.util.function.{Consumer, BiConsumer} + +import org.apache.s2graph.core.S2Edge.{Props, State} +import org.apache.s2graph.core.JSONParser._ +import org.apache.s2graph.core.mysqls.{Label, LabelIndex, LabelMeta} +import org.apache.s2graph.core.types._ +import org.apache.s2graph.core.utils.logger +import org.apache.tinkerpop.gremlin.structure +import org.apache.tinkerpop.gremlin.structure.{Edge, Graph, Vertex, Direction, Property} +import play.api.libs.json.{JsNumber, JsObject, Json} + +import scala.collection.JavaConverters._ +import scala.collection.mutable.{Map => MutableMap} +import scala.util.hashing.MurmurHash3 + +case class SnapshotEdge(graph: S2Graph, + srcVertex: S2Vertex, + tgtVertex: S2Vertex, + label: Label, + dir: Int, + op: Byte, + version: Long, + private val propsWithTs: Props, + pendingEdgeOpt: Option[S2Edge], + statusCode: Byte = 0, + lockTs: Option[Long], + tsInnerValOpt: Option[InnerValLike] = None) { + lazy val direction = GraphUtil.fromDirection(dir) + lazy val operation = GraphUtil.fromOp(op) + lazy val edge = toEdge + lazy val labelWithDir = LabelWithDirection(label.id.get, dir) +// if (!propsWithTs.contains(LabelMeta.timestamp.name)) throw new Exception("Timestamp is required.") + +// val label = Label.findById(labelWithDir.labelId) + lazy val schemaVer = label.schemaVersion + lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.toString().toLong + + def propsToKeyValuesWithTs = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.asScala.map(kv => kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq) + + def allPropsDeleted = S2Edge.allPropsDeleted(propsWithTs) + + def toEdge: S2Edge = { + S2Edge(graph, srcVertex, tgtVertex, label, dir, op, + version, propsWithTs, pendingEdgeOpt = pendingEdgeOpt, + statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt) + } + + def propsWithName = (for { + (_, v) <- propsWithTs.asScala + meta = v.labelMeta + jsValue <- innerValToJsValue(v.innerVal, meta.dataType) + } yield meta.name -> jsValue) ++ Map("version" -> JsNumber(version)) + + // only for debug + def toLogString() = { + List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, propsWithName).mkString("\t") + } + + def property[V](key: String, value: V, ts: Long): S2Property[V] = { + val labelMeta = label.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on IndexEdge.")) + val newProps = new S2Property(edge, labelMeta, key, value, ts) + propsWithTs.put(key, newProps) + newProps + } + override def hashCode(): Int = { + MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId) + } + + override def equals(other: Any): Boolean = other match { + case e: SnapshotEdge => + srcVertex.innerId == e.srcVertex.innerId && + tgtVertex.innerId == e.tgtVertex.innerId && + labelWithDir == e.labelWithDir && op == e.op && version == e.version && + pendingEdgeOpt == e.pendingEdgeOpt && lockTs == lockTs && statusCode == statusCode + case _ => false + } + + override def toString(): String = { + Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> label.label, "direction" -> direction, + "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString, + "statusCode" -> statusCode, "lockTs" -> lockTs).toString + } +} + +case class IndexEdge(graph: S2Graph, + srcVertex: S2Vertex, + tgtVertex: S2Vertex, + label: Label, + dir: Int, + op: Byte, + version: Long, + labelIndexSeq: Byte, + private val propsWithTs: Props, + tsInnerValOpt: Option[InnerValLike] = None) { +// if (!props.contains(LabelMeta.timeStampSeq)) throw new Exception("Timestamp is required.") + // assert(props.contains(LabelMeta.timeStampSeq)) + lazy val direction = GraphUtil.fromDirection(dir) + lazy val operation = GraphUtil.fromOp(op) + lazy val edge = toEdge + lazy val labelWithDir = LabelWithDirection(label.id.get, dir) + + lazy val isInEdge = labelWithDir.dir == GraphUtil.directions("in") + lazy val isOutEdge = !isInEdge + + lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.toString.toLong + lazy val degreeEdge = propsWithTs.containsKey(LabelMeta.degree.name) + + lazy val schemaVer = label.schemaVersion + lazy val labelIndex = LabelIndex.findByLabelIdAndSeq(labelWithDir.labelId, labelIndexSeq).get + lazy val defaultIndexMetas = labelIndex.sortKeyTypes.map { meta => + val innerVal = toInnerVal(meta.defaultValue, meta.dataType, schemaVer) + meta.seq -> innerVal + }.toMap + + lazy val labelIndexMetaSeqs = labelIndex.sortKeyTypes + + /** TODO: make sure call of this class fill props as this assumes */ + lazy val orders = for (meta <- labelIndexMetaSeqs) yield { + propsWithTs.get(meta.name) match { + case null => + + /** + * TODO: agly hack + * now we double store target vertex.innerId/srcVertex.innerId for easy development. later fix this to only store id once + */ + val v = meta match { + case LabelMeta.timestamp=> InnerVal.withLong(version, schemaVer) + case LabelMeta.to => toEdge.tgtVertex.innerId + case LabelMeta.from => toEdge.srcVertex.innerId + // for now, it does not make sense to build index on srcVertex.innerId since all edges have same data. + // throw new RuntimeException("_from on indexProps is not supported") + case _ => toInnerVal(meta.defaultValue, meta.dataType, schemaVer) + } + + meta -> v + case v => meta -> v.innerVal + } + } + + lazy val ordersKeyMap = orders.map { case (meta, _) => meta.name }.toSet + lazy val metas = for ((meta, v) <- propsWithTs.asScala if !ordersKeyMap.contains(meta)) yield v.labelMeta -> v.innerVal + +// lazy val propsWithTs = props.map { case (k, v) => k -> InnerValLikeWithTs(v, version) } + + //TODO: + // lazy val kvs = Graph.client.indexedEdgeSerializer(this).toKeyValues.toList + + lazy val hasAllPropsForIndex = orders.length == labelIndexMetaSeqs.length + + def propsWithName = for { + (_, v) <- propsWithTs.asScala + meta = v.labelMeta + jsValue <- innerValToJsValue(v.innerVal, meta.dataType) + } yield meta.name -> jsValue + + + def toEdge: S2Edge = S2Edge(graph, srcVertex, tgtVertex, label, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt) + + // only for debug + def toLogString() = { + List(version, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, label.label, Json.toJson(propsWithName)).mkString("\t") + } + + def property(key: String): Option[InnerValLikeWithTs] = { + label.metaPropsInvMap.get(key).map(labelMeta => property(labelMeta)) + } + + def property(labelMeta: LabelMeta): InnerValLikeWithTs = { +// propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse(label.metaPropsDefaultMapInner(labelMeta)) + if (propsWithTs.containsKey(labelMeta.name)) { + propsWithTs.get(labelMeta.name).innerValWithTs + } else { + label.metaPropsDefaultMapInner(labelMeta) + } + } + + def updatePropsWithTs(others: Props = S2Edge.EmptyProps): Props = { + if (others.isEmpty) propsWithTs + else { + val iter = others.entrySet().iterator() + while (iter.hasNext) { + val e = iter.next() + propsWithTs.put(e.getKey, e.getValue) + } + propsWithTs + } + } + + def property[V](key: String, value: V, ts: Long): S2Property[V] = { + val labelMeta = label.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on IndexEdge.")) + val newProps = new S2Property(edge, labelMeta, key, value, ts) + propsWithTs.put(key, newProps) + newProps + } + override def hashCode(): Int = { + MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId + "," + labelIndexSeq) + } + + override def equals(other: Any): Boolean = other match { + case e: IndexEdge => + srcVertex.innerId == e.srcVertex.innerId && + tgtVertex.innerId == e.tgtVertex.innerId && + labelWithDir == e.labelWithDir && op == e.op && version == e.version && + labelIndexSeq == e.labelIndexSeq + case _ => false + } + + override def toString(): String = { + Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> label.label, "direction" -> dir, + "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString + ).toString + } +} + +case class S2Edge(innerGraph: S2Graph, + srcVertex: S2Vertex, + var tgtVertex: S2Vertex, + innerLabel: Label, + dir: Int, + var op: Byte = GraphUtil.defaultOpByte, + var version: Long = System.currentTimeMillis(), + propsWithTs: Props = S2Edge.EmptyProps, + parentEdges: Seq[EdgeWithScore] = Nil, + originalEdgeOpt: Option[S2Edge] = None, + pendingEdgeOpt: Option[S2Edge] = None, + statusCode: Byte = 0, + lockTs: Option[Long] = None, + var tsInnerValOpt: Option[InnerValLike] = None) extends GraphElement with Edge { + + lazy val labelWithDir = LabelWithDirection(innerLabel.id.get, dir) + lazy val schemaVer = innerLabel.schemaVersion + lazy val ts = propsWithTs.get(LabelMeta.timestamp.name).innerVal.value match { + case b: BigDecimal => b.longValue() + case l: Long => l + case i: Int => i.toLong + case _ => throw new RuntimeException("ts should be in [BigDecimal/Long/Int].") + } + + lazy val operation = GraphUtil.fromOp(op) + lazy val tsInnerVal = tsInnerValOpt.get.value + lazy val srcId = srcVertex.innerIdVal + lazy val tgtId = tgtVertex.innerIdVal + lazy val labelName = innerLabel.label + lazy val direction = GraphUtil.fromDirection(dir) + + def toIndexEdge(labelIndexSeq: Byte): IndexEdge = IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelIndexSeq, propsWithTs) + + def serializePropsWithTs(): Array[Byte] = HBaseSerializable.propsToKeyValuesWithTs(propsWithTs.asScala.map(kv => kv._2.labelMeta.seq -> kv._2.innerValWithTs).toSeq) + + def updatePropsWithTs(others: Props = S2Edge.EmptyProps): Props = { + val emptyProp = S2Edge.EmptyProps + + propsWithTs.forEach(new BiConsumer[String, S2Property[_]] { + override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value) + }) + + others.forEach(new BiConsumer[String, S2Property[_]] { + override def accept(key: String, value: S2Property[_]): Unit = emptyProp.put(key, value) + }) + + emptyProp + } + + def propertyValue(key: String): Option[InnerValLikeWithTs] = { + key match { + case "from" | "_from" => Option(InnerValLikeWithTs(srcVertex.innerId, ts)) + case "to" | "_to" => Option(InnerValLikeWithTs(tgtVertex.innerId, ts)) + case "label" => Option(InnerValLikeWithTs(InnerVal.withStr(innerLabel.label, schemaVer), ts)) + case "direction" => Option(InnerValLikeWithTs(InnerVal.withStr(direction, schemaVer), ts)) + case _ => + innerLabel.metaPropsInvMap.get(key).map(labelMeta => propertyValueInner(labelMeta)) + } + } + + def propertyValueInner(labelMeta: LabelMeta): InnerValLikeWithTs= { + // propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse() + if (propsWithTs.containsKey(labelMeta.name)) { + propsWithTs.get(labelMeta.name).innerValWithTs + } else { + innerLabel.metaPropsDefaultMapInner(labelMeta) + } + } + + def propertyValues(keys: Seq[String] = Nil): Map[LabelMeta, InnerValLikeWithTs] = { + val labelMetas = for { + key <- keys + labelMeta <- innerLabel.metaPropsInvMap.get(key) + } yield labelMeta + + propertyValuesInner(labelMetas) + } + + def propertyValuesInner(labelMetas: Seq[LabelMeta] = Nil): Map[LabelMeta, InnerValLikeWithTs] = { + if (labelMetas.isEmpty) { + innerLabel.metaPropsDefaultMapInner.map { case (labelMeta, defaultVal) => + labelMeta -> propertyValueInner(labelMeta) + } + } else { + // This is important since timestamp is required for all edges. + (LabelMeta.timestamp +: labelMetas).map { labelMeta => + labelMeta -> propertyValueInner(labelMeta) + }.toMap + } + } + +// if (!props.contains(LabelMeta.timestamp)) throw new Exception("Timestamp is required.") + // assert(propsWithTs.contains(LabelMeta.timeStampSeq)) + + lazy val properties = toProps() + + def props = propsWithTs.asScala.mapValues(_.innerVal) + + + private def toProps(): Map[String, Any] = { + for { + (labelMeta, defaultVal) <- innerLabel.metaPropsDefaultMapInner + } yield { + // labelMeta.name -> propsWithTs.get(labelMeta.name).map(_.innerValWithTs).getOrElse(defaultVal).innerVal.value + val value = + if (propsWithTs.containsKey(labelMeta.name)) { + propsWithTs.get(labelMeta.name).value + } else { + defaultVal.innerVal.value + } + labelMeta.name -> value + } + } + + def relatedEdges = { + if (labelWithDir.isDirected) { + val skipReverse = innerLabel.extraOptions.get("skipReverse").map(_.as[Boolean]).getOrElse(false) + if (skipReverse) List(this) else List(this, duplicateEdge) + } else { +// val outDir = labelWithDir.copy(dir = GraphUtil.directions("out")) +// val base = copy(labelWithDir = outDir) + val base = copy(dir = GraphUtil.directions("out")) + List(base, base.reverseSrcTgtEdge) + } + } + + // def relatedEdges = List(this) + + def srcForVertex = { + val belongLabelIds = Seq(labelWithDir.labelId) + if (labelWithDir.dir == GraphUtil.directions("in")) { + innerGraph.newVertex(VertexId(innerLabel.tgtColumn, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds) + } else { + innerGraph.newVertex(VertexId(innerLabel.srcColumn, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds) + } + } + + def tgtForVertex = { + val belongLabelIds = Seq(labelWithDir.labelId) + if (labelWithDir.dir == GraphUtil.directions("in")) { + innerGraph.newVertex(VertexId(innerLabel.srcColumn, srcVertex.innerId), srcVertex.ts, srcVertex.props, belongLabelIds = belongLabelIds) + } else { + innerGraph.newVertex(VertexId(innerLabel.tgtColumn, tgtVertex.innerId), tgtVertex.ts, tgtVertex.props, belongLabelIds = belongLabelIds) + } + } + + def duplicateEdge = reverseSrcTgtEdge.reverseDirEdge + +// def reverseDirEdge = copy(labelWithDir = labelWithDir.dirToggled) + def reverseDirEdge = copy(dir = GraphUtil.toggleDir(dir)) + + def reverseSrcTgtEdge = copy(srcVertex = tgtVertex, tgtVertex = srcVertex) + + def labelOrders = LabelIndex.findByLabelIdAll(labelWithDir.labelId) + + override def serviceName = innerLabel.serviceName + + override def queueKey = Seq(ts.toString, tgtVertex.serviceName).mkString("|") + + override def queuePartitionKey = Seq(srcVertex.innerId, tgtVertex.innerId).mkString("|") + + override def isAsync = innerLabel.isAsync + + def isDegree = propsWithTs.containsKey(LabelMeta.degree.name) + +// def propsPlusTs = propsWithTs.get(LabelMeta.timeStampSeq) match { +// case Some(_) => props +// case None => props ++ Map(LabelMeta.timeStampSeq -> InnerVal.withLong(ts, schemaVer)) +// } + + def propsPlusTsValid = propsWithTs.asScala.filter(kv => LabelMeta.isValidSeq(kv._2.labelMeta.seq)).asJava + + def edgesWithIndex = for (labelOrder <- labelOrders) yield { + IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsWithTs, tsInnerValOpt = tsInnerValOpt) + } + + def edgesWithIndexValid = for (labelOrder <- labelOrders) yield { + IndexEdge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, labelOrder.seq, propsPlusTsValid, tsInnerValOpt = tsInnerValOpt) + } + + /** force direction as out on invertedEdge */ + def toSnapshotEdge: SnapshotEdge = { + val (smaller, larger) = (srcForVertex, tgtForVertex) + +// val newLabelWithDir = LabelWithDirection(labelWithDir.labelId, GraphUtil.directions("out")) + + property(LabelMeta.timestamp.name, ts, ts) + val ret = SnapshotEdge(innerGraph, smaller, larger, innerLabel, + GraphUtil.directions("out"), op, version, propsWithTs, + pendingEdgeOpt = pendingEdgeOpt, statusCode = statusCode, lockTs = lockTs, tsInnerValOpt = tsInnerValOpt) + ret + } + + def defaultPropsWithName = Json.obj("from" -> srcVertex.innerId.toString(), "to" -> tgtVertex.innerId.toString(), + "label" -> innerLabel.label, "service" -> innerLabel.serviceName) + + def propsWithName = + for { + (_, v) <- propsWithTs.asScala + meta = v.labelMeta + jsValue <- innerValToJsValue(v.innerVal, meta.dataType) + } yield meta.name -> jsValue + + + def updateTgtVertex(id: InnerValLike) = { + val newId = TargetVertexId(tgtVertex.id.column, id) + val newTgtVertex = innerGraph.newVertex(newId, tgtVertex.ts, tgtVertex.props) + S2Edge(innerGraph, srcVertex, newTgtVertex, innerLabel, dir, op, version, propsWithTs, tsInnerValOpt = tsInnerValOpt) + } + + def rank(r: RankParam): Double = + if (r.keySeqAndWeights.size <= 0) 1.0f + else { + var sum: Double = 0 + + for ((labelMeta, w) <- r.keySeqAndWeights) { + if (propsWithTs.containsKey(labelMeta.name)) { + val innerValWithTs = propsWithTs.get(labelMeta.name) + val cost = try innerValWithTs.innerVal.toString.toDouble catch { + case e: Exception => + logger.error("toInnerval failed in rank", e) + 1.0 + } + sum += w * cost + } + } + sum + } + + def toLogString: String = { + val allPropsWithName = defaultPropsWithName ++ Json.toJson(propsWithName).asOpt[JsObject].getOrElse(Json.obj()) + List(ts, GraphUtil.fromOp(op), "e", srcVertex.innerId, tgtVertex.innerId, innerLabel.label, allPropsWithName).mkString("\t") + } + + override def hashCode(): Int = { + MurmurHash3.stringHash(srcVertex.innerId + "," + labelWithDir + "," + tgtVertex.innerId) + } + + override def equals(other: Any): Boolean = other match { + case e: S2Edge => + srcVertex.innerId == e.srcVertex.innerId && + tgtVertex.innerId == e.tgtVertex.innerId && + labelWithDir == e.labelWithDir && op == e.op && version == e.version && + pendingEdgeOpt == e.pendingEdgeOpt && lockTs == lockTs && statusCode == statusCode && + parentEdges == e.parentEdges && originalEdgeOpt == originalEdgeOpt + case _ => false + } + + override def toString(): String = { + Map("srcVertex" -> srcVertex.toString, "tgtVertex" -> tgtVertex.toString, "label" -> labelName, "direction" -> direction, + "operation" -> operation, "version" -> version, "props" -> propsWithTs.asScala.map(kv => kv._1 -> kv._2.value).toString, + "parentEdges" -> parentEdges, "originalEdge" -> originalEdgeOpt, "statusCode" -> statusCode, "lockTs" -> lockTs + ).toString + } + + def checkProperty(key: String): Boolean = propsWithTs.containsKey(key) + + def copyEdge(srcVertex: S2Vertex = srcVertex, + tgtVertex: S2Vertex = tgtVertex, + innerLabel: Label = innerLabel, + dir: Int = dir, + op: Byte = op, + version: Long = version, + propsWithTs: State = S2Edge.propsToState(this.propsWithTs), + parentEdges: Seq[EdgeWithScore] = parentEdges, + originalEdgeOpt: Option[S2Edge] = originalEdgeOpt, + pendingEdgeOpt: Option[S2Edge] = pendingEdgeOpt, + statusCode: Byte = statusCode, + lockTs: Option[Long] = lockTs, + tsInnerValOpt: Option[InnerValLike] = tsInnerValOpt, + ts: Long = ts): S2Edge = { + val edge = new S2Edge(innerGraph, srcVertex, tgtVertex, innerLabel, dir, op, version, S2Edge.EmptyProps, + parentEdges, originalEdgeOpt, pendingEdgeOpt, statusCode, lockTs, tsInnerValOpt) + S2Edge.fillPropsWithTs(edge, propsWithTs) + edge.property(LabelMeta.timestamp.name, ts, ts) + edge + } + + def copyEdgeWithState(state: State, ts: Long): S2Edge = { + val newEdge = copy(propsWithTs = S2Edge.EmptyProps) + S2Edge.fillPropsWithTs(newEdge, state) + newEdge.property(LabelMeta.timestamp.name, ts, ts) + newEdge + } + + def copyEdgeWithState(state: State): S2Edge = { + val newEdge = copy(propsWithTs = S2Edge.EmptyProps) + S2Edge.fillPropsWithTs(newEdge, state) + newEdge + } + + override def vertices(direction: Direction): util.Iterator[structure.Vertex] = { + val arr = new util.ArrayList[Vertex]() + direction match { + case Direction.OUT => arr.add(srcVertex) + case Direction.IN => arr.add(tgtVertex) + case _ => + arr.add(srcVertex) + arr.add(tgtVertex) + } + arr.iterator() + } + + override def properties[V](keys: String*): util.Iterator[Property[V]] = { + val ls = new util.ArrayList[Property[V]]() + keys.foreach { key => ls.add(property(key)) } + ls.iterator() + } + + override def property[V](key: String): Property[V] = { + val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Edge.")) + if (propsWithTs.containsKey(key)) propsWithTs.get(key).asInstanceOf[Property[V]] + else { + val default = innerLabel.metaPropsDefaultMapInner(labelMeta) + property(key, default.innerVal.value, default.ts).asInstanceOf[Property[V]] + } + } + + override def property[V](key: String, value: V): Property[V] = { + property(key, value, System.currentTimeMillis()) + } + + def property[V](key: String, value: V, ts: Long): Property[V] = { + val labelMeta = innerLabel.metaPropsInvMap.getOrElse(key, throw new RuntimeException(s"$key is not configured on Edge.")) + val newProp = new S2Property[V](this, labelMeta, key, value, ts) + propsWithTs.put(key, newProp) + newProp + } + + override def remove(): Unit = {} + + override def graph(): Graph = innerGraph + + override def id(): AnyRef = (srcVertex.innerId, labelWithDir, tgtVertex.innerId) + + override def label(): String = innerLabel.label +} + + +case class EdgeMutate(edgesToDelete: List[IndexEdge] = List.empty[IndexEdge], + edgesToInsert: List[IndexEdge] = List.empty[IndexEdge], + newSnapshotEdge: Option[SnapshotEdge] = None) { + + def toLogString: String = { + val l = (0 until 50).map(_ => "-").mkString("") + val deletes = s"deletes: ${edgesToDelete.map(e => e.toLogString).mkString("\n")}" + val inserts = s"inserts: ${edgesToInsert.map(e => e.toLogString).mkString("\n")}" + val updates = s"snapshot: ${newSnapshotEdge.map(e => e.toLogString).mkString("\n")}" + + List("\n", l, deletes, inserts, updates, l, "\n").mkString("\n") + } +} + +object S2Edge { + val incrementVersion = 1L + val minTsVal = 0L + + /** now version information is required also **/ + type Props = java.util.Map[String, S2Property[_]] + type State = Map[LabelMeta, InnerValLikeWithTs] + type PropsPairWithTs = (State, State, Long, String) + type MergeState = PropsPairWithTs => (State, Boolean) + type UpdateFunc = (Option[S2Edge], S2Edge, MergeState) + + def EmptyProps = new java.util.HashMap[String, S2Property[_]] + def EmptyState = Map.empty[LabelMeta, InnerValLikeWithTs] + def sameProps(base: Props, other: Props): Boolean = { + if (base.size != other.size) false + else { + var ret = true + val iter = base.entrySet().iterator() + while (iter.hasNext) { + val e = iter.next() + if (!other.containsKey(e.getKey)) ret = false + else if (e.getValue != other.get(e.getKey)) ret = false + else { + + } + } + val otherIter = other.entrySet().iterator() + while (otherIter.hasNext) { + val e = otherIter.next() + if (!base.containsKey(e.getKey)) ret = false + else if (e.getValue != base.get(e.getKey)) ret = false + else { + + } + } + ret + } +// base.sameElements(other) + } + def fillPropsWithTs(snapshotEdge: SnapshotEdge, state: State): Unit = { + state.foreach { case (k, v) => snapshotEdge.property(k.name, v.innerVal.value, v.ts) } + } + def fillPropsWithTs(indexEdge: IndexEdge, state: State): Unit = { + state.foreach { case (k, v) => indexEdge.property(k.name, v.innerVal.value, v.ts) } + } + def fillPropsWithTs(edge: S2Edge, state: State): Unit = { + state.foreach { case (k, v) => edge.property(k.name, v.innerVal.value, v.ts) } + } + + def propsToState(props: Props): State = { + props.asScala.map { case (k, v) => + v.labelMeta -> v.innerValWithTs + }.toMap + } + + def stateToProps(edge: S2Edge, state: State): Props = { + state.foreach { case (k, v) => + edge.property(k.name, v.innerVal.value, v.ts) + } + edge.propsWithTs + } + + def allPropsDeleted(props: Map[LabelMeta, InnerValLikeWithTs]): Boolean = + if (!props.contains(LabelMeta.lastDeletedAt)) false + else { + val lastDeletedAt = props.get(LabelMeta.lastDeletedAt).get.ts + val propsWithoutLastDeletedAt = props - LabelMeta.lastDeletedAt + + propsWithoutLastDeletedAt.forall { case (_, v) => v.ts <= lastDeletedAt } + } + + def allPropsDeleted(props: Props): Boolean = + if (!props.containsKey(LabelMeta.lastDeletedAt.name)) false + else { + val lastDeletedAt = props.get(LabelMeta.lastDeletedAt.name).ts + props.remove(LabelMeta.lastDeletedAt.name) +// val propsWithoutLastDeletedAt = props +// +// propsWithoutLastDeletedAt.forall { case (_, v) => v.ts <= lastDeletedAt } + var ret = true + val iter = props.entrySet().iterator() - while (iter.hasNext) { ++ while (iter.hasNext && ret) { + val e = iter.next() + if (e.getValue.ts > lastDeletedAt) ret = false + } + ret + } + + def buildDeleteBulk(invertedEdge: Option[S2Edge], requestEdge: S2Edge): (S2Edge, EdgeMutate) = { + // assert(invertedEdge.isEmpty) + // assert(requestEdge.op == GraphUtil.operations("delete")) + + val edgesToDelete = requestEdge.relatedEdges.flatMap { relEdge => relEdge.edgesWithIndexValid } + val edgeInverted = Option(requestEdge.toSnapshotEdge) + + (requestEdge, EdgeMutate(edgesToDelete, edgesToInsert = Nil, newSnapshotEdge = edgeInverted)) + } + + def buildOperation(invertedEdge: Option[S2Edge], requestEdges: Seq[S2Edge]): (S2Edge, EdgeMutate) = { + // logger.debug(s"oldEdge: ${invertedEdge.map(_.toStringRaw)}") + // logger.debug(s"requestEdge: ${requestEdge.toStringRaw}") + val oldPropsWithTs = + if (invertedEdge.isEmpty) Map.empty[LabelMeta, InnerValLikeWithTs] + else propsToState(invertedEdge.get.propsWithTs) + + val funcs = requestEdges.map { edge => + if (edge.op == GraphUtil.operations("insert")) { + edge.innerLabel.consistencyLevel match { + case "strong" => S2Edge.mergeUpsert _ + case _ => S2Edge.mergeInsertBulk _ + } + } else if (edge.op == GraphUtil.operations("insertBulk")) { + S2Edge.mergeInsertBulk _ + } else if (edge.op == GraphUtil.operations("delete")) { + edge.innerLabel.consistencyLevel match { + case "strong" => S2Edge.mergeDelete _ + case _ => throw new RuntimeException("not supported") + } + } + else if (edge.op == GraphUtil.operations("update")) S2Edge.mergeUpdate _ + else if (edge.op == GraphUtil.operations("increment")) S2Edge.mergeIncrement _ + else throw new RuntimeException(s"not supported operation on edge: $edge") + } + + val oldTs = invertedEdge.map(_.ts).getOrElse(minTsVal) + val requestWithFuncs = requestEdges.zip(funcs).filter(oldTs != _._1.ts).sortBy(_._1.ts) + + if (requestWithFuncs.isEmpty) { + (requestEdges.head, EdgeMutate()) + } else { + val requestEdge = requestWithFuncs.last._1 + var prevPropsWithTs = oldPropsWithTs + + for { + (requestEdge, func) <- requestWithFuncs + } { + val (_newPropsWithTs, _) = func(prevPropsWithTs, propsToState(requestEdge.propsWithTs), requestEdge.ts, requestEdge.schemaVer) + prevPropsWithTs = _newPropsWithTs + // logger.debug(s"${requestEdge.toLogString}\n$oldPropsWithTs\n$prevPropsWithTs\n") + } + val requestTs = requestEdge.ts + /** version should be monotoniously increasing so our RPC mutation should be applied safely */ + val newVersion = invertedEdge.map(e => e.version + incrementVersion).getOrElse(requestTs) + val maxTs = prevPropsWithTs.map(_._2.ts).max + val newTs = if (maxTs > requestTs) maxTs else requestTs + val propsWithTs = prevPropsWithTs ++ + Map(LabelMeta.timestamp -> InnerValLikeWithTs(InnerVal.withLong(newTs, requestEdge.innerLabel.schemaVersion), newTs)) + + val edgeMutate = buildMutation(invertedEdge, requestEdge, newVersion, oldPropsWithTs, propsWithTs) + + // logger.debug(s"${edgeMutate.toLogString}\n${propsWithTs}") + // logger.error(s"$propsWithTs") + val newEdge = requestEdge.copy(propsWithTs = EmptyProps) + fillPropsWithTs(newEdge, propsWithTs) + (newEdge, edgeMutate) + } + } + + def filterOutWithLabelOption(ls: Seq[IndexEdge]): Seq[IndexEdge] = ls.filter { ie => + ie.labelIndex.dir match { + case None => + // both direction use same indices that is defined when label creation. + true + case Some(dir) => + if (dir != ie.dir) { + // current labelIndex's direction is different with indexEdge's direction so don't touch + false + } else { + ie.labelIndex.writeOption.map { option => + val hashValueOpt = ie.orders.find { case (k, v) => k == LabelMeta.fromHash }.map{ case (k, v) => v.value.toString.toLong } + option.sample(ie, hashValueOpt) + }.getOrElse(true) + } + } + } + + def buildMutation(snapshotEdgeOpt: Option[S2Edge], + requestEdge: S2Edge, + newVersion: Long, + oldPropsWithTs: Map[LabelMeta, InnerValLikeWithTs], + newPropsWithTs: Map[LabelMeta, InnerValLikeWithTs]): EdgeMutate = { + + if (oldPropsWithTs == newPropsWithTs) { + // all requests should be dropped. so empty mutation. + EdgeMutate(edgesToDelete = Nil, edgesToInsert = Nil, newSnapshotEdge = None) + } else { + val withOutDeletedAt = newPropsWithTs.filter(kv => kv._1 != LabelMeta.lastDeletedAtSeq) + val newOp = snapshotEdgeOpt match { + case None => requestEdge.op + case Some(old) => + val oldMaxTs = old.propsWithTs.asScala.map(_._2.ts).max + if (oldMaxTs > requestEdge.ts) old.op + else requestEdge.op + } + + val newSnapshotEdge = requestEdge.copy(op = newOp, version = newVersion).copyEdgeWithState(newPropsWithTs) + + val newSnapshotEdgeOpt = Option(newSnapshotEdge.toSnapshotEdge) + // delete request must always update snapshot. + if (withOutDeletedAt == oldPropsWithTs && newPropsWithTs.contains(LabelMeta.lastDeletedAt)) { + // no mutation on indexEdges. only snapshotEdge should be updated to record lastDeletedAt. + EdgeMutate(edgesToDelete = Nil, edgesToInsert = Nil, newSnapshotEdge = newSnapshotEdgeOpt) + } else { + val edgesToDelete = snapshotEdgeOpt match { + case Some(snapshotEdge) if snapshotEdge.op != GraphUtil.operations("delete") => + snapshotEdge.copy(op = GraphUtil.defaultOpByte) + .relatedEdges.flatMap { relEdge => filterOutWithLabelOption(relEdge.edgesWithIndexValid) } + case _ => Nil + } + + val edgesToInsert = + if (newPropsWithTs.isEmpty || allPropsDeleted(newPropsWithTs)) Nil + else { + val newEdge = requestEdge.copy( + version = newVersion, + propsWithTs = S2Edge.EmptyProps, + op = GraphUtil.defaultOpByte + ) + newPropsWithTs.foreach { case (k, v) => newEdge.property(k.name, v.innerVal.value, v.ts) } + + newEdge.relatedEdges.flatMap { relEdge => filterOutWithLabelOption(relEdge.edgesWithIndexValid) } + } + + + EdgeMutate(edgesToDelete = edgesToDelete, + edgesToInsert = edgesToInsert, + newSnapshotEdge = newSnapshotEdgeOpt) + } + } + } + + def mergeUpsert(propsPairWithTs: PropsPairWithTs): (State, Boolean) = { + var shouldReplace = false + val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs + val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal) + val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield { + propsWithTs.get(k) match { + case Some(newValWithTs) => + assert(oldValWithTs.ts >= lastDeletedAt) + val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs + else { + shouldReplace = true + newValWithTs + } + Some(k -> v) + + case None => + assert(oldValWithTs.ts >= lastDeletedAt) + if (oldValWithTs.ts >= requestTs || k.seq < 0) Some(k -> oldValWithTs) + else { + shouldReplace = true + None + } + } + } + val existInNew = + for { + (k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt + } yield { + shouldReplace = true + Some(k -> newValWithTs) + } + + ((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace) + } + + def mergeUpdate(propsPairWithTs: PropsPairWithTs): (State, Boolean) = { + var shouldReplace = false + val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs + val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal) + val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield { + propsWithTs.get(k) match { + case Some(newValWithTs) => + assert(oldValWithTs.ts >= lastDeletedAt) + val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs + else { + shouldReplace = true + newValWithTs + } + Some(k -> v) + case None => + // important: update need to merge previous valid values. + assert(oldValWithTs.ts >= lastDeletedAt) + Some(k -> oldValWithTs) + } + } + val existInNew = for { + (k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt + } yield { + shouldReplace = true + Some(k -> newValWithTs) + } + + ((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace) + } + + def mergeIncrement(propsPairWithTs: PropsPairWithTs): (State, Boolean) = { + var shouldReplace = false + val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs + val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt).map(v => v.ts).getOrElse(minTsVal) + val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield { + propsWithTs.get(k) match { + case Some(newValWithTs) => + if (k == LabelMeta.timestamp) { + val v = if (oldValWithTs.ts >= newValWithTs.ts) oldValWithTs + else { + shouldReplace = true + newValWithTs + } + Some(k -> v) + } else { + if (oldValWithTs.ts >= newValWithTs.ts) { + Some(k -> oldValWithTs) + } else { + assert(oldValWithTs.ts < newValWithTs.ts && oldValWithTs.ts >= lastDeletedAt) + shouldReplace = true + // incr(t0), incr(t2), d(t1) => deleted + Some(k -> InnerValLikeWithTs(oldValWithTs.innerVal + newValWithTs.innerVal, oldValWithTs.ts)) + } + } + + case None => + assert(oldValWithTs.ts >= lastDeletedAt) + Some(k -> oldValWithTs) + // if (oldValWithTs.ts >= lastDeletedAt) Some(k -> oldValWithTs) else None + } + } + val existInNew = for { + (k, newValWithTs) <- propsWithTs if !oldPropsWithTs.contains(k) && newValWithTs.ts > lastDeletedAt + } yield { + shouldReplace = true + Some(k -> newValWithTs) + } + + ((existInOld.flatten ++ existInNew.flatten).toMap, shouldReplace) + } + + def mergeDelete(propsPairWithTs: PropsPairWithTs): (State, Boolean) = { + var shouldReplace = false + val (oldPropsWithTs, propsWithTs, requestTs, version) = propsPairWithTs + val lastDeletedAt = oldPropsWithTs.get(LabelMeta.lastDeletedAt) match { + case Some(prevDeletedAt) => + if (prevDeletedAt.ts >= requestTs) prevDeletedAt.ts + else { + shouldReplace = true + requestTs + } + case None => { + shouldReplace = true + requestTs + } + } + val existInOld = for ((k, oldValWithTs) <- oldPropsWithTs) yield { + if (k == LabelMeta.timestamp) { + if (oldValWithTs.ts >= requestTs) Some(k -> oldValWithTs) + else { + shouldReplace = true + Some(k -> InnerValLikeWithTs.withLong(requestTs, requestTs, version)) + } + } else { + if (oldValWithTs.ts >= lastDeletedAt) Some(k -> oldValWithTs) + else { + shouldReplace = true + None + } + } + } + val mustExistInNew = Map(LabelMeta.lastDeletedAt -> InnerValLikeWithTs.withLong(lastDeletedAt, lastDeletedAt, version)) + ((existInOld.flatten ++ mustExistInNew).toMap, shouldReplace) + } + + def mergeInsertBulk(propsPairWithTs: PropsPairWithTs): (State, Boolean) = { + val (_, propsWithTs, _, _) = propsPairWithTs + (propsWithTs, true) + } + +// def fromString(s: String): Option[Edge] = Graph.toEdge(s) + + +}