Repository: flink Updated Branches: refs/heads/master 6ed5815e8 -> 47944b1bb
http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala index 04b63a1..9210c00 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala @@ -17,23 +17,21 @@ */ package org.apache.flink.table.runtime.aggregate -import java.sql.Timestamp import java.util import java.util.{List => JList} -import org.apache.calcite.runtime.SqlFunctions +import org.apache.flink.api.common.state._ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.ListTypeInfo import org.apache.flink.configuration.Configuration -import org.apache.flink.types.Row import org.apache.flink.streaming.api.functions.ProcessFunction -import org.apache.flink.util.Collector -import org.apache.flink.api.common.state._ -import org.apache.flink.api.java.typeutils.ListTypeInfo import org.apache.flink.streaming.api.operators.TimestampedCollector import org.apache.flink.table.api.StreamQueryConfig import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction} import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo} -import org.slf4j.LoggerFactory +import org.apache.flink.types.Row +import org.apache.flink.util.Collector +import org.slf4j.{Logger, LoggerFactory} /** @@ -52,6 +50,8 @@ abstract class RowTimeUnboundedOver( extends ProcessFunctionWithCleanupState[CRow, CRow](queryConfig) with Compiler[GeneratedAggregations] { + val LOG: Logger = LoggerFactory.getLogger(this.getClass) + protected var output: CRow = _ // state to hold the accumulators of the aggregations private var accumulatorState: ValueState[Row] = _ @@ -60,7 +60,6 @@ abstract class RowTimeUnboundedOver( // list to sort timestamps to access rows in timestamp order private var sortedTimestamps: util.LinkedList[Long] = _ - val LOG = LoggerFactory.getLogger(this.getClass) protected var function: GeneratedAggregations = _ override def open(config: Configuration) { @@ -111,7 +110,7 @@ abstract class RowTimeUnboundedOver( // register state-cleanup timer registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime()) - val timestamp = SqlFunctions.toLong(input.getField(rowTimeIdx).asInstanceOf[Timestamp]) + val timestamp = input.getField(rowTimeIdx).asInstanceOf[Long] val curWatermark = ctx.timerService().currentWatermark() // discard late record http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala index 4ec5239..16e4a0b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/TimeWindowPropertyCollector.scala @@ -59,7 +59,7 @@ abstract class TimeWindowPropertyCollector[T]( if (windowRowtimeOffset.isDefined) { output.setField( lastFieldPos + windowRowtimeOffset.get, - SqlFunctions.internalToTimestamp(windowEnd - 1)) + windowEnd - 1) } wrappedCollector.collect(record) http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToJavaTupleMapFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToJavaTupleMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToJavaTupleMapFunction.scala new file mode 100644 index 0000000..6b4f87e --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToJavaTupleMapFunction.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.conversion + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +import _root_.java.lang.{Boolean => JBool} + +/** + * Convert [[CRow]] to a [[JTuple2]] containing a [[Row]]. + */ +class CRowToJavaTupleMapFunction extends MapFunction[CRow, JTuple2[JBool, Row]] { + + val out: JTuple2[JBool, Row] = new JTuple2(true.asInstanceOf[JBool], null.asInstanceOf[Row]) + + override def map(cRow: CRow): JTuple2[JBool, Row] = { + out.f0 = cRow.change + out.f1 = cRow.row + out + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToJavaTupleMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToJavaTupleMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToJavaTupleMapRunner.scala new file mode 100644 index 0000000..95f304d --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToJavaTupleMapRunner.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.conversion + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.slf4j.{Logger, LoggerFactory} + +/** + * Convert [[CRow]] to a [[JTuple2]]. + */ +class CRowToJavaTupleMapRunner( + name: String, + code: String, + @transient var returnType: TypeInformation[JTuple2[JBool, Any]]) + extends RichMapFunction[CRow, Any] + with ResultTypeQueryable[JTuple2[JBool, Any]] + with Compiler[MapFunction[Row, Any]] { + + val LOG: Logger = LoggerFactory.getLogger(this.getClass) + + private var function: MapFunction[Row, Any] = _ + private var tupleWrapper: JTuple2[JBool, Any] = _ + + override def open(parameters: Configuration): Unit = { + LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code") + val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) + LOG.debug("Instantiating MapFunction.") + function = clazz.newInstance() + tupleWrapper = new JTuple2[JBool, Any]() + } + + override def map(in: CRow): JTuple2[JBool, Any] = { + tupleWrapper.f0 = in.change + tupleWrapper.f1 = function.map(in.row) + tupleWrapper + } + + override def getProducedType: TypeInformation[JTuple2[JBool, Any]] = returnType +} http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToRowMapFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToRowMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToRowMapFunction.scala new file mode 100644 index 0000000..050f15f --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToRowMapFunction.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.conversion + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * Maps a CRow to a Row. + */ +class CRowToRowMapFunction extends MapFunction[CRow, Row] { + + override def map(value: CRow): Row = value.row + +} http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToScalaTupleMapFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToScalaTupleMapFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToScalaTupleMapFunction.scala new file mode 100644 index 0000000..6461cc4 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToScalaTupleMapFunction.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.conversion + +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row + +/** + * Convert [[CRow]] to a [[Tuple2]]. + */ +class CRowToScalaTupleMapFunction extends MapFunction[CRow, (Boolean, Row)] { + + override def map(cRow: CRow): (Boolean, Row) = { + (cRow.change, cRow.row) + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToScalaTupleMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToScalaTupleMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToScalaTupleMapRunner.scala new file mode 100644 index 0000000..c7d71a9 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/conversion/CRowToScalaTupleMapRunner.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.conversion + +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.apache.flink.table.codegen.Compiler +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.types.Row +import org.slf4j.{Logger, LoggerFactory} + +/** + * Convert [[CRow]] to a [[Tuple2]]. + */ +class CRowToScalaTupleMapRunner( + name: String, + code: String, + @transient var returnType: TypeInformation[(Boolean, Any)]) + extends RichMapFunction[CRow, (Boolean, Any)] + with ResultTypeQueryable[(Boolean, Any)] + with Compiler[MapFunction[Row, Any]] { + + val LOG: Logger = LoggerFactory.getLogger(this.getClass) + + private var function: MapFunction[Row, Any] = _ + + override def open(parameters: Configuration): Unit = { + LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code") + val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) + LOG.debug("Instantiating MapFunction.") + function = clazz.newInstance() + } + + override def map(in: CRow): (Boolean, Any) = + (in.change, function.map(in.row)) + + override def getProducedType: TypeInformation[(Boolean, Any)] = returnType +} http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala index e0e054b..824f3fb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala @@ -20,13 +20,14 @@ package org.apache.flink.table.typeutils import java.sql.Timestamp +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo -import org.apache.flink.api.common.typeutils.TypeComparator -import org.apache.flink.api.common.typeutils.base.{SqlTimestampComparator, SqlTimestampSerializer} +import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer} +import org.apache.flink.api.common.typeutils.base.{LongSerializer, SqlTimestampComparator, SqlTimestampSerializer} /** * Type information for indicating event or processing time. However, it behaves like a - * regular SQL timestamp. + * regular SQL timestamp but is serialized as Long. */ class TimeIndicatorTypeInfo(val isEventTime: Boolean) extends SqlTimeTypeInfo[Timestamp]( @@ -34,6 +35,12 @@ class TimeIndicatorTypeInfo(val isEventTime: Boolean) SqlTimestampSerializer.INSTANCE, classOf[SqlTimestampComparator].asInstanceOf[Class[TypeComparator[Timestamp]]]) { + // this replaces the effective serializer by a LongSerializer + // it is a hacky but efficient solution to keep the object creation overhead low but still + // be compatible with the corresponding SqlTimestampTypeInfo + override def createSerializer(executionConfig: ExecutionConfig): TypeSerializer[Timestamp] = + LongSerializer.INSTANCE.asInstanceOf[TypeSerializer[Timestamp]] + override def toString: String = s"TimeIndicatorTypeInfo(${if (isEventTime) "rowtime" else "proctime" })" } http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala index ba044be..ba36e18 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala @@ -15,12 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.table.runtime.harness import java.lang.{Long => JLong} import java.util.concurrent.ConcurrentLinkedQueue -import org.apache.calcite.runtime.SqlFunctions.{internalToTimestamp => toTS} import org.apache.flink.api.common.time.Time import org.apache.flink.api.common.typeinfo.BasicTypeInfo import org.apache.flink.streaming.api.operators.KeyedProcessOperator @@ -34,7 +34,7 @@ import org.junit.Test class OverWindowHarnessTest extends HarnessTestBase{ - protected var queryConfig = + protected var queryConfig: StreamQueryConfig = new StreamQueryConfig().withIdleStateRetentionTime(Time.seconds(2), Time.seconds(3)) @Test @@ -60,75 +60,75 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.setProcessingTime(1) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(1), "aaa", 1L: JLong), true))) + CRow(Row.of(1L: JLong, "aaa", 1L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(1), "bbb", 10L: JLong), true))) + CRow(Row.of(1L: JLong, "bbb", 10L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(1), "aaa", 2L: JLong), true))) + CRow(Row.of(1L: JLong, "aaa", 2L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(1), "aaa", 3L: JLong), true))) + CRow(Row.of(1L: JLong, "aaa", 3L: JLong), change = true))) // register cleanup timer with 4100 testHarness.setProcessingTime(1100) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(1), "bbb", 20L: JLong), true))) + CRow(Row.of(1L: JLong, "bbb", 20L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(1), "aaa", 4L: JLong), true))) + CRow(Row.of(1L: JLong, "aaa", 4L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(1), "aaa", 5L: JLong), true))) + CRow(Row.of(1L: JLong, "aaa", 5L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(1), "aaa", 6L: JLong), true))) + CRow(Row.of(1L: JLong, "aaa", 6L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(1), "bbb", 30L: JLong), true))) + CRow(Row.of(1L: JLong, "bbb", 30L: JLong), change = true))) // register cleanup timer with 6001 testHarness.setProcessingTime(3001) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(2), "aaa", 7L: JLong), true))) + CRow(Row.of(2L: JLong, "aaa", 7L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(2), "aaa", 8L: JLong), true))) + CRow(Row.of(2L: JLong, "aaa", 8L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(2), "aaa", 9L: JLong), true))) + CRow(Row.of(2L: JLong, "aaa", 9L: JLong), change = true))) // trigger cleanup timer and register cleanup timer with 9002 testHarness.setProcessingTime(6002) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(2), "aaa", 10L: JLong), true))) + CRow(Row.of(2L: JLong, "aaa", 10L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(2), "bbb", 40L: JLong), true))) + CRow(Row.of(2L: JLong, "bbb", 40L: JLong), change = true))) val result = testHarness.getOutput val expectedOutput = new ConcurrentLinkedQueue[Object]() expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(1), "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true))) + CRow(Row.of(1L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(1), "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true))) + CRow(Row.of(1L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(1), "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true))) + CRow(Row.of(1L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(1), "aaa", 3L: JLong, 2L: JLong, 3L: JLong), true))) + CRow(Row.of(1L: JLong, "aaa", 3L: JLong, 2L: JLong, 3L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(1), "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true))) + CRow(Row.of(1L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(1), "aaa", 4L: JLong, 3L: JLong, 4L: JLong), true))) + CRow(Row.of(1L: JLong, "aaa", 4L: JLong, 3L: JLong, 4L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(1), "aaa", 5L: JLong, 4L: JLong, 5L: JLong), true))) + CRow(Row.of(1L: JLong, "aaa", 5L: JLong, 4L: JLong, 5L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(1), "aaa", 6L: JLong, 5L: JLong, 6L: JLong), true))) + CRow(Row.of(1L: JLong, "aaa", 6L: JLong, 5L: JLong, 6L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(1), "bbb", 30L: JLong, 20L: JLong, 30L: JLong), true))) + CRow(Row.of(1L: JLong, "bbb", 30L: JLong, 20L: JLong, 30L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(2), "aaa", 7L: JLong, 6L: JLong, 7L: JLong), true))) + CRow(Row.of(2L: JLong, "aaa", 7L: JLong, 6L: JLong, 7L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(2), "aaa", 8L: JLong, 7L: JLong, 8L: JLong), true))) + CRow(Row.of(2L: JLong, "aaa", 8L: JLong, 7L: JLong, 8L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(2), "aaa", 9L: JLong, 8L: JLong, 9L: JLong), true))) + CRow(Row.of(2L: JLong, "aaa", 9L: JLong, 8L: JLong, 9L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(2), "aaa", 10L: JLong, 10L: JLong, 10L: JLong), true))) + CRow(Row.of(2L: JLong, "aaa", 10L: JLong, 10L: JLong, 10L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(2), "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true))) + CRow(Row.of(2L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), change = true))) verify(expectedOutput, result, new RowResultSortComparator()) @@ -160,51 +160,51 @@ class OverWindowHarnessTest extends HarnessTestBase{ // register cleanup timer with 3003 testHarness.setProcessingTime(3) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 1L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 1L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(0), "bbb", 10L: JLong), true))) + CRow(Row.of(0L: JLong, "bbb", 10L: JLong), change = true))) testHarness.setProcessingTime(4) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 2L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 2L: JLong), change = true))) // trigger cleanup timer and register cleanup timer with 6003 testHarness.setProcessingTime(3003) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 3L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 3L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(0), "bbb", 20L: JLong), true))) + CRow(Row.of(0L: JLong, "bbb", 20L: JLong), change = true))) testHarness.setProcessingTime(5) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 4L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 4L: JLong), change = true))) // register cleanup timer with 9002 testHarness.setProcessingTime(6002) testHarness.setProcessingTime(7002) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 5L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 5L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 6L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 6L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(0), "bbb", 30L: JLong), true))) + CRow(Row.of(0L: JLong, "bbb", 30L: JLong), change = true))) // register cleanup timer with 14002 testHarness.setProcessingTime(11002) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 7L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 7L: JLong), change = true))) testHarness.setProcessingTime(11004) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 8L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 8L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 9L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 9L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 10L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 10L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(0), "bbb", 40L: JLong), true))) + CRow(Row.of(0L: JLong, "bbb", 40L: JLong), change = true))) testHarness.setProcessingTime(11006) @@ -214,33 +214,33 @@ class OverWindowHarnessTest extends HarnessTestBase{ // all elements at the same proc timestamp have the same value per key expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(0), "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true))) + CRow(Row.of(0L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 3L: JLong, 3L: JLong, 4L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 3L: JLong, 3L: JLong, 4L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(0), "bbb", 20L: JLong, 20L: JLong, 20L: JLong), true))) + CRow(Row.of(0L: JLong, "bbb", 20L: JLong, 20L: JLong, 20L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 4L: JLong, 4L: JLong, 4L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 4L: JLong, 4L: JLong, 4L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 5L: JLong, 5L: JLong, 6L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 5L: JLong, 5L: JLong, 6L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 6L: JLong, 5L: JLong, 6L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 6L: JLong, 5L: JLong, 6L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(0), "bbb", 30L: JLong, 30L: JLong, 30L: JLong), true))) + CRow(Row.of(0L: JLong, "bbb", 30L: JLong, 30L: JLong, 30L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 7L: JLong, 7L: JLong, 7L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 7L: JLong, 7L: JLong, 7L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 8L: JLong, 7L: JLong, 10L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 8L: JLong, 7L: JLong, 10L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 9L: JLong, 7L: JLong, 10L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 9L: JLong, 7L: JLong, 10L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 10L: JLong, 7L: JLong, 10L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 10L: JLong, 7L: JLong, 10L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(0), "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true))) + CRow(Row.of(0L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), change = true))) verify(expectedOutput, result, new RowResultSortComparator()) @@ -268,69 +268,69 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.setProcessingTime(1003) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 1L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 1L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(0), "bbb", 10L: JLong), true))) + CRow(Row.of(0L: JLong, "bbb", 10L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 2L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 2L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 3L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 3L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(0), "bbb", 20L: JLong), true))) + CRow(Row.of(0L: JLong, "bbb", 20L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 4L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 4L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 5L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 5L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 6L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 6L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(0), "bbb", 30L: JLong), true))) + CRow(Row.of(0L: JLong, "bbb", 30L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 7L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 7L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 8L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 8L: JLong), change = true))) // trigger cleanup timer and register cleanup timer with 8003 testHarness.setProcessingTime(5003) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 9L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 9L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 10L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 10L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(0), "bbb", 40L: JLong), true))) + CRow(Row.of(0L: JLong, "bbb", 40L: JLong), change = true))) val result = testHarness.getOutput val expectedOutput = new ConcurrentLinkedQueue[Object]() expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(0), "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true))) + CRow(Row.of(0L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(0), "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true))) + CRow(Row.of(0L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 5L: JLong, 1L: JLong, 5L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 5L: JLong, 1L: JLong, 5L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(0), "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true))) + CRow(Row.of(0L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 9L: JLong, 9L: JLong, 9L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 9L: JLong, 9L: JLong, 9L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(0), "aaa", 10L: JLong, 9L: JLong, 10L: JLong), true))) + CRow(Row.of(0L: JLong, "aaa", 10L: JLong, 9L: JLong, 10L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(0), "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true))) + CRow(Row.of(0L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), change = true))) verify(expectedOutput, result, new RowResultSortComparator()) testHarness.close() @@ -361,51 +361,51 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.processWatermark(1) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(2), "aaa", 1L: JLong), true))) + CRow(Row.of(2L: JLong, "aaa", 1L: JLong), change = true))) testHarness.processWatermark(2) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(3), "bbb", 10L: JLong), true))) + CRow(Row.of(3L: JLong, "bbb", 10L: JLong), change = true))) testHarness.processWatermark(4000) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(4001), "aaa", 2L: JLong), true))) + CRow(Row.of(4001L: JLong, "aaa", 2L: JLong), change = true))) testHarness.processWatermark(4001) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(4002), "aaa", 3L: JLong), true))) + CRow(Row.of(4002L: JLong, "aaa", 3L: JLong), change = true))) testHarness.processWatermark(4002) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(4003), "aaa", 4L: JLong), true))) + CRow(Row.of(4003L: JLong, "aaa", 4L: JLong), change = true))) testHarness.processWatermark(4800) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(4801), "bbb", 25L: JLong), true))) + CRow(Row.of(4801L: JLong, "bbb", 25L: JLong), change = true))) testHarness.processWatermark(6500) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(6501), "aaa", 5L: JLong), true))) + CRow(Row.of(6501L: JLong, "aaa", 5L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(6501), "aaa", 6L: JLong), true))) + CRow(Row.of(6501L: JLong, "aaa", 6L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(6501), "bbb", 30L: JLong), true))) + CRow(Row.of(6501L: JLong, "bbb", 30L: JLong), change = true))) testHarness.processWatermark(7000) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(7001), "aaa", 7L: JLong), true))) + CRow(Row.of(7001L: JLong, "aaa", 7L: JLong), change = true))) testHarness.processWatermark(8000) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(8001), "aaa", 8L: JLong), true))) + CRow(Row.of(8001L: JLong, "aaa", 8L: JLong), change = true))) testHarness.processWatermark(12000) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(12001), "aaa", 9L: JLong), true))) + CRow(Row.of(12001L: JLong, "aaa", 9L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(12001), "aaa", 10L: JLong), true))) + CRow(Row.of(12001L: JLong, "aaa", 10L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(12001), "bbb", 40L: JLong), true))) + CRow(Row.of(12001L: JLong, "bbb", 40L: JLong), change = true))) testHarness.processWatermark(19000) @@ -415,10 +415,10 @@ class OverWindowHarnessTest extends HarnessTestBase{ // check that state is removed after max retention time testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(20001), "ccc", 1L: JLong), true))) // clean-up 3000 + CRow(Row.of(20001L: JLong, "ccc", 1L: JLong), change = true))) // clean-up 3000 testHarness.setProcessingTime(2500) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(20002), "ccc", 2L: JLong), true))) // clean-up 4500 + CRow(Row.of(20002L: JLong, "ccc", 2L: JLong), change = true))) // clean-up 4500 testHarness.processWatermark(20010) // compute output assert(testHarness.numKeyedStateEntries() > 0) // check that we have state @@ -430,7 +430,7 @@ class OverWindowHarnessTest extends HarnessTestBase{ // check that state is only removed if all data was processed testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(20011), "ccc", 3L: JLong), true))) // clean-up 6500 + CRow(Row.of(20011L: JLong, "ccc", 3L: JLong), change = true))) // clean-up 6500 assert(testHarness.numKeyedStateEntries() > 0) // check that we have state testHarness.setProcessingTime(6500) // clean-up attempt but rescheduled to 8500 @@ -450,40 +450,40 @@ class OverWindowHarnessTest extends HarnessTestBase{ // all elements at the same row-time have the same value per key expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(2), "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true))) + CRow(Row.of(2L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(3), "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true))) + CRow(Row.of(3L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(4001), "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true))) + CRow(Row.of(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(4002), "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true))) + CRow(Row.of(4002L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(4003), "aaa", 4L: JLong, 2L: JLong, 4L: JLong), true))) + CRow(Row.of(4003L: JLong, "aaa", 4L: JLong, 2L: JLong, 4L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(4801), "bbb", 25L: JLong, 25L: JLong, 25L: JLong), true))) + CRow(Row.of(4801L: JLong, "bbb", 25L: JLong, 25L: JLong, 25L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(6501), "aaa", 5L: JLong, 2L: JLong, 6L: JLong), true))) + CRow(Row.of(6501L: JLong, "aaa", 5L: JLong, 2L: JLong, 6L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(6501), "aaa", 6L: JLong, 2L: JLong, 6L: JLong), true))) + CRow(Row.of(6501L: JLong, "aaa", 6L: JLong, 2L: JLong, 6L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(7001), "aaa", 7L: JLong, 2L: JLong, 7L: JLong), true))) + CRow(Row.of(7001L: JLong, "aaa", 7L: JLong, 2L: JLong, 7L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(8001), "aaa", 8L: JLong, 2L: JLong, 8L: JLong), true))) + CRow(Row.of(8001L: JLong, "aaa", 8L: JLong, 2L: JLong, 8L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(6501), "bbb", 30L: JLong, 25L: JLong, 30L: JLong), true))) + CRow(Row.of(6501L: JLong, "bbb", 30L: JLong, 25L: JLong, 30L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(12001), "aaa", 9L: JLong, 8L: JLong, 10L: JLong), true))) + CRow(Row.of(12001L: JLong, "aaa", 9L: JLong, 8L: JLong, 10L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(12001), "aaa", 10L: JLong, 8L: JLong, 10L: JLong), true))) + CRow(Row.of(12001L: JLong, "aaa", 10L: JLong, 8L: JLong, 10L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(12001), "bbb", 40L: JLong, 40L: JLong, 40L: JLong), true))) + CRow(Row.of(12001L: JLong, "bbb", 40L: JLong, 40L: JLong, 40L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(20001), "ccc", 1L: JLong, 1L: JLong, 1L: JLong), true))) + CRow(Row.of(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(20002), "ccc", 2L: JLong, 1L: JLong, 2L: JLong), true))) + CRow(Row.of(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(20011), "ccc", 3L: JLong, 3L: JLong, 3L: JLong), true))) + CRow(Row.of(20011L: JLong, "ccc", 3L: JLong, 3L: JLong, 3L: JLong), change = true))) verify(expectedOutput, result, new RowResultSortComparator()) testHarness.close() @@ -511,47 +511,47 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.processWatermark(800) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(801), "aaa", 1L: JLong), true))) + CRow(Row.of(801L: JLong, "aaa", 1L: JLong), change = true))) testHarness.processWatermark(2500) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(2501), "bbb", 10L: JLong), true))) + CRow(Row.of(2501L: JLong, "bbb", 10L: JLong), change = true))) testHarness.processWatermark(4000) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(4001), "aaa", 2L: JLong), true))) + CRow(Row.of(4001L: JLong, "aaa", 2L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(4001), "aaa", 3L: JLong), true))) + CRow(Row.of(4001L: JLong, "aaa", 3L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(4001), "bbb", 20L: JLong), true))) + CRow(Row.of(4001L: JLong, "bbb", 20L: JLong), change = true))) testHarness.processWatermark(4800) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(4801), "aaa", 4L: JLong), true))) + CRow(Row.of(4801L: JLong, "aaa", 4L: JLong), change = true))) testHarness.processWatermark(6500) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(6501), "aaa", 5L: JLong), true))) + CRow(Row.of(6501L: JLong, "aaa", 5L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(6501), "aaa", 6L: JLong), true))) + CRow(Row.of(6501L: JLong, "aaa", 6L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(6501), "bbb", 30L: JLong), true))) + CRow(Row.of(6501L: JLong, "bbb", 30L: JLong), change = true))) testHarness.processWatermark(7000) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(7001), "aaa", 7L: JLong), true))) + CRow(Row.of(7001L: JLong, "aaa", 7L: JLong), change = true))) testHarness.processWatermark(8000) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(8001), "aaa", 8L: JLong), true))) + CRow(Row.of(8001L: JLong, "aaa", 8L: JLong), change = true))) testHarness.processWatermark(12000) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(12001), "aaa", 9L: JLong), true))) + CRow(Row.of(12001L: JLong, "aaa", 9L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(12001), "aaa", 10L: JLong), true))) + CRow(Row.of(12001L: JLong, "aaa", 10L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(12001), "bbb", 40L: JLong), true))) + CRow(Row.of(12001L: JLong, "bbb", 40L: JLong), change = true))) testHarness.processWatermark(19000) @@ -561,10 +561,10 @@ class OverWindowHarnessTest extends HarnessTestBase{ // check that state is removed after max retention time testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(20001), "ccc", 1L: JLong), true))) // clean-up 3000 + CRow(Row.of(20001L: JLong, "ccc", 1L: JLong), change = true))) // clean-up 3000 testHarness.setProcessingTime(2500) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(20002), "ccc", 2L: JLong), true))) // clean-up 4500 + CRow(Row.of(20002L: JLong, "ccc", 2L: JLong), change = true))) // clean-up 4500 testHarness.processWatermark(20010) // compute output assert(testHarness.numKeyedStateEntries() > 0) // check that we have state @@ -575,7 +575,7 @@ class OverWindowHarnessTest extends HarnessTestBase{ // check that state is only removed if all data was processed testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(20011), "ccc", 3L: JLong), true))) // clean-up 6500 + CRow(Row.of(20011L: JLong, "ccc", 3L: JLong), change = true))) // clean-up 6500 assert(testHarness.numKeyedStateEntries() > 0) // check that we have state testHarness.setProcessingTime(6500) // clean-up attempt but rescheduled to 8500 @@ -595,40 +595,40 @@ class OverWindowHarnessTest extends HarnessTestBase{ val expectedOutput = new ConcurrentLinkedQueue[Object]() expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(801), "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true))) + CRow(Row.of(801L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(2501), "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true))) + CRow(Row.of(2501L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(4001), "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true))) + CRow(Row.of(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(4001), "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true))) + CRow(Row.of(4001L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(4001), "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true))) + CRow(Row.of(4001L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(4801), "aaa", 4L: JLong, 2L: JLong, 4L: JLong), true))) + CRow(Row.of(4801L: JLong, "aaa", 4L: JLong, 2L: JLong, 4L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(6501), "aaa", 5L: JLong, 3L: JLong, 5L: JLong), true))) + CRow(Row.of(6501L: JLong, "aaa", 5L: JLong, 3L: JLong, 5L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(6501), "aaa", 6L: JLong, 4L: JLong, 6L: JLong), true))) + CRow(Row.of(6501L: JLong, "aaa", 6L: JLong, 4L: JLong, 6L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(6501), "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true))) + CRow(Row.of(6501L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(7001), "aaa", 7L: JLong, 5L: JLong, 7L: JLong), true))) + CRow(Row.of(7001L: JLong, "aaa", 7L: JLong, 5L: JLong, 7L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(8001), "aaa", 8L: JLong, 6L: JLong, 8L: JLong), true))) + CRow(Row.of(8001L: JLong, "aaa", 8L: JLong, 6L: JLong, 8L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(12001), "aaa", 9L: JLong, 7L: JLong, 9L: JLong), true))) + CRow(Row.of(12001L: JLong, "aaa", 9L: JLong, 7L: JLong, 9L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(12001), "aaa", 10L: JLong, 8L: JLong, 10L: JLong), true))) + CRow(Row.of(12001L: JLong, "aaa", 10L: JLong, 8L: JLong, 10L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(12001), "bbb", 40L: JLong, 20L: JLong, 40L: JLong), true))) + CRow(Row.of(12001L: JLong, "bbb", 40L: JLong, 20L: JLong, 40L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(20001), "ccc", 1L: JLong, 1L: JLong, 1L: JLong), true))) + CRow(Row.of(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(20002), "ccc", 2L: JLong, 1L: JLong, 2L: JLong), true))) + CRow(Row.of(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(20011), "ccc", 3L: JLong, 3L: JLong, 3L: JLong), true))) + CRow(Row.of(20011L: JLong, "ccc", 3L: JLong, 3L: JLong, 3L: JLong), change = true))) verify(expectedOutput, result, new RowResultSortComparator()) testHarness.close() @@ -659,47 +659,47 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.setProcessingTime(1000) testHarness.processWatermark(800) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(801), "aaa", 1L: JLong), true))) + CRow(Row.of(801L: JLong, "aaa", 1L: JLong), change = true))) testHarness.processWatermark(2500) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(2501), "bbb", 10L: JLong), true))) + CRow(Row.of(2501L: JLong, "bbb", 10L: JLong), change = true))) testHarness.processWatermark(4000) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(4001), "aaa", 2L: JLong), true))) + CRow(Row.of(4001L: JLong, "aaa", 2L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(4001), "aaa", 3L: JLong), true))) + CRow(Row.of(4001L: JLong, "aaa", 3L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(4001), "bbb", 20L: JLong), true))) + CRow(Row.of(4001L: JLong, "bbb", 20L: JLong), change = true))) testHarness.processWatermark(4800) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(4801), "aaa", 4L: JLong), true))) + CRow(Row.of(4801L: JLong, "aaa", 4L: JLong), change = true))) testHarness.processWatermark(6500) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(6501), "aaa", 5L: JLong), true))) + CRow(Row.of(6501L: JLong, "aaa", 5L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(6501), "aaa", 6L: JLong), true))) + CRow(Row.of(6501L: JLong, "aaa", 6L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(6501), "bbb", 30L: JLong), true))) + CRow(Row.of(6501L: JLong, "bbb", 30L: JLong), change = true))) testHarness.processWatermark(7000) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(7001), "aaa", 7L: JLong), true))) + CRow(Row.of(7001L: JLong, "aaa", 7L: JLong), change = true))) testHarness.processWatermark(8000) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(8001), "aaa", 8L: JLong), true))) + CRow(Row.of(8001L: JLong, "aaa", 8L: JLong), change = true))) testHarness.processWatermark(12000) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(12001), "aaa", 9L: JLong), true))) + CRow(Row.of(12001L: JLong, "aaa", 9L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(12001), "aaa", 10L: JLong), true))) + CRow(Row.of(12001L: JLong, "aaa", 10L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(12001), "bbb", 40L: JLong), true))) + CRow(Row.of(12001L: JLong, "bbb", 40L: JLong), change = true))) testHarness.processWatermark(19000) @@ -712,10 +712,10 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.processWatermark(20000) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(20001), "ccc", 1L: JLong), true))) // clean-up 5000 + CRow(Row.of(20001L: JLong, "ccc", 1L: JLong), change = true))) // clean-up 5000 testHarness.setProcessingTime(2500) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(20002), "ccc", 2L: JLong), true))) // clean-up 5000 + CRow(Row.of(20002L: JLong, "ccc", 2L: JLong), change = true))) // clean-up 5000 assert(testHarness.numKeyedStateEntries() > 0) testHarness.setProcessingTime(5000) // does not clean up, because data left. New timer 7000 @@ -733,38 +733,38 @@ class OverWindowHarnessTest extends HarnessTestBase{ // all elements at the same row-time have the same value per key expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(801), "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true))) + CRow(Row.of(801L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(2501), "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true))) + CRow(Row.of(2501L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(4001), "aaa", 2L: JLong, 1L: JLong, 3L: JLong), true))) + CRow(Row.of(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 3L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(4001), "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true))) + CRow(Row.of(4001L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(4001), "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true))) + CRow(Row.of(4001L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(4801), "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true))) + CRow(Row.of(4801L: JLong, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(6501), "aaa", 5L: JLong, 1L: JLong, 6L: JLong), true))) + CRow(Row.of(6501L: JLong, "aaa", 5L: JLong, 1L: JLong, 6L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(6501), "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true))) + CRow(Row.of(6501L: JLong, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(6501), "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true))) + CRow(Row.of(6501L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(7001), "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true))) + CRow(Row.of(7001L: JLong, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(8001), "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true))) + CRow(Row.of(8001L: JLong, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(12001), "aaa", 9L: JLong, 1L: JLong, 10L: JLong), true))) + CRow(Row.of(12001L: JLong, "aaa", 9L: JLong, 1L: JLong, 10L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(12001), "aaa", 10L: JLong, 1L: JLong, 10L: JLong), true))) + CRow(Row.of(12001L: JLong, "aaa", 10L: JLong, 1L: JLong, 10L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(12001), "bbb", 40L: JLong, 10L: JLong, 40L: JLong), true))) + CRow(Row.of(12001L: JLong, "bbb", 40L: JLong, 10L: JLong, 40L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(20001), "ccc", 1L: JLong, 1L: JLong, 1L: JLong), true))) + CRow(Row.of(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(20002), "ccc", 2L: JLong, 1L: JLong, 2L: JLong), true))) + CRow(Row.of(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong), change = true))) verify(expectedOutput, result, new RowResultSortComparator()) testHarness.close() @@ -792,47 +792,47 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.setProcessingTime(1000) testHarness.processWatermark(800) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(801), "aaa", 1L: JLong), true))) + CRow(Row.of(801L: JLong, "aaa", 1L: JLong), change = true))) testHarness.processWatermark(2500) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(2501), "bbb", 10L: JLong), true))) + CRow(Row.of(2501L: JLong, "bbb", 10L: JLong), change = true))) testHarness.processWatermark(4000) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(4001), "aaa", 2L: JLong), true))) + CRow(Row.of(4001L: JLong, "aaa", 2L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(4001), "aaa", 3L: JLong), true))) + CRow(Row.of(4001L: JLong, "aaa", 3L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(4001), "bbb", 20L: JLong), true))) + CRow(Row.of(4001L: JLong, "bbb", 20L: JLong), change = true))) testHarness.processWatermark(4800) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(4801), "aaa", 4L: JLong), true))) + CRow(Row.of(4801L: JLong, "aaa", 4L: JLong), change = true))) testHarness.processWatermark(6500) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(6501), "aaa", 5L: JLong), true))) + CRow(Row.of(6501L: JLong, "aaa", 5L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(6501), "aaa", 6L: JLong), true))) + CRow(Row.of(6501L: JLong, "aaa", 6L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(6501), "bbb", 30L: JLong), true))) + CRow(Row.of(6501L: JLong, "bbb", 30L: JLong), change = true))) testHarness.processWatermark(7000) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(7001), "aaa", 7L: JLong), true))) + CRow(Row.of(7001L: JLong, "aaa", 7L: JLong), change = true))) testHarness.processWatermark(8000) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(8001), "aaa", 8L: JLong), true))) + CRow(Row.of(8001L: JLong, "aaa", 8L: JLong), change = true))) testHarness.processWatermark(12000) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(12001), "aaa", 9L: JLong), true))) + CRow(Row.of(12001L: JLong, "aaa", 9L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(12001), "aaa", 10L: JLong), true))) + CRow(Row.of(12001L: JLong, "aaa", 10L: JLong), change = true))) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(12001), "bbb", 40L: JLong), true))) + CRow(Row.of(12001L: JLong, "bbb", 40L: JLong), change = true))) testHarness.processWatermark(19000) @@ -845,10 +845,10 @@ class OverWindowHarnessTest extends HarnessTestBase{ testHarness.processWatermark(20000) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(20001), "ccc", 1L: JLong), true))) // clean-up 5000 + CRow(Row.of(20001L: JLong, "ccc", 1L: JLong), change = true))) // clean-up 5000 testHarness.setProcessingTime(2500) testHarness.processElement(new StreamRecord( - CRow(Row.of(toTS(20002), "ccc", 2L: JLong), true))) // clean-up 5000 + CRow(Row.of(20002L: JLong, "ccc", 2L: JLong), change = true))) // clean-up 5000 assert(testHarness.numKeyedStateEntries() > 0) testHarness.setProcessingTime(5000) // does not clean up, because data left. New timer 7000 @@ -865,38 +865,38 @@ class OverWindowHarnessTest extends HarnessTestBase{ val expectedOutput = new ConcurrentLinkedQueue[Object]() expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(801), "aaa", 1L: JLong, 1L: JLong, 1L: JLong), true))) + CRow(Row.of(801L: JLong, "aaa", 1L: JLong, 1L: JLong, 1L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(2501), "bbb", 10L: JLong, 10L: JLong, 10L: JLong), true))) + CRow(Row.of(2501L: JLong, "bbb", 10L: JLong, 10L: JLong, 10L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(4001), "aaa", 2L: JLong, 1L: JLong, 2L: JLong), true))) + CRow(Row.of(4001L: JLong, "aaa", 2L: JLong, 1L: JLong, 2L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(4001), "aaa", 3L: JLong, 1L: JLong, 3L: JLong), true))) + CRow(Row.of(4001L: JLong, "aaa", 3L: JLong, 1L: JLong, 3L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(4001), "bbb", 20L: JLong, 10L: JLong, 20L: JLong), true))) + CRow(Row.of(4001L: JLong, "bbb", 20L: JLong, 10L: JLong, 20L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(4801), "aaa", 4L: JLong, 1L: JLong, 4L: JLong), true))) + CRow(Row.of(4801L: JLong, "aaa", 4L: JLong, 1L: JLong, 4L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(6501), "aaa", 5L: JLong, 1L: JLong, 5L: JLong), true))) + CRow(Row.of(6501L: JLong, "aaa", 5L: JLong, 1L: JLong, 5L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(6501), "aaa", 6L: JLong, 1L: JLong, 6L: JLong), true))) + CRow(Row.of(6501L: JLong, "aaa", 6L: JLong, 1L: JLong, 6L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(6501), "bbb", 30L: JLong, 10L: JLong, 30L: JLong), true))) + CRow(Row.of(6501L: JLong, "bbb", 30L: JLong, 10L: JLong, 30L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(7001), "aaa", 7L: JLong, 1L: JLong, 7L: JLong), true))) + CRow(Row.of(7001L: JLong, "aaa", 7L: JLong, 1L: JLong, 7L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(8001), "aaa", 8L: JLong, 1L: JLong, 8L: JLong), true))) + CRow(Row.of(8001L: JLong, "aaa", 8L: JLong, 1L: JLong, 8L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(12001), "aaa", 9L: JLong, 1L: JLong, 9L: JLong), true))) + CRow(Row.of(12001L: JLong, "aaa", 9L: JLong, 1L: JLong, 9L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(12001), "aaa", 10L: JLong, 1L: JLong, 10L: JLong), true))) + CRow(Row.of(12001L: JLong, "aaa", 10L: JLong, 1L: JLong, 10L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(12001), "bbb", 40L: JLong, 10L: JLong, 40L: JLong), true))) + CRow(Row.of(12001L: JLong, "bbb", 40L: JLong, 10L: JLong, 40L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(20001), "ccc", 1L: JLong, 1L: JLong, 1L: JLong), true))) + CRow(Row.of(20001L: JLong, "ccc", 1L: JLong, 1L: JLong, 1L: JLong), change = true))) expectedOutput.add(new StreamRecord( - CRow(Row.of(toTS(20002), "ccc", 2L: JLong, 1L: JLong, 2L: JLong), true))) + CRow(Row.of(20002L: JLong, "ccc", 2L: JLong, 1L: JLong, 2L: JLong), change = true))) verify(expectedOutput, result, new RowResultSortComparator()) testHarness.close() http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala index 18ba6bb..9490039 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/SortProcessFunctionHarnessTest.scala @@ -21,7 +21,6 @@ package org.apache.flink.table.runtime.harness import java.lang.{Integer => JInt, Long => JLong} import java.util.concurrent.ConcurrentLinkedQueue -import org.apache.calcite.runtime.SqlFunctions.{internalToTimestamp => toTS} import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.common.typeutils.{TypeComparator, TypeSerializer} @@ -189,35 +188,35 @@ class SortProcessFunctionHarnessTest { // timestamp is ignored in processing time testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", toTS(1001)), true))) + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1001L: JLong), true))) testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", toTS(2002)), true))) + Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 2002L: JLong), true))) testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 13L: JLong, 2: JInt, "aaa", toTS(2002)), true))) + Row.of(1: JInt, 13L: JLong, 2: JInt, "aaa", 2002L: JLong), true))) testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", toTS(2002)), true))) + Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 2002L: JLong), true))) testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", toTS(2002)), true))) + Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", 2002L: JLong), true))) testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", toTS(2004)), true))) + Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 2004L: JLong), true))) testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", toTS(2006)), true))) + Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 2006L: JLong), true))) // move watermark forward testHarness.processWatermark(2007) testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 20L: JLong, 1: JInt, "aaa", toTS(2008)), true))) + Row.of(1: JInt, 20L: JLong, 1: JInt, "aaa", 2008L: JLong), true))) testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", toTS(2002)), true))) // too late + Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", 2002L: JLong), true))) // too late testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", toTS(2019)), true))) // too early + Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 2019L: JLong), true))) // too early testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 20L: JLong, 2: JInt, "aaa", toTS(2008)), true))) + Row.of(1: JInt, 20L: JLong, 2: JInt, "aaa", 2008L: JLong), true))) testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", toTS(2010)), true))) + Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 2010L: JLong), true))) testHarness.processElement(new StreamRecord(new CRow( - Row.of(1: JInt, 19L: JLong, 0: JInt, "aaa", toTS(2008)), true))) + Row.of(1: JInt, 19L: JLong, 0: JInt, "aaa", 2008L: JLong), true))) // move watermark forward testHarness.processWatermark(2012) @@ -231,29 +230,29 @@ class SortProcessFunctionHarnessTest { // (10,0) (11,1) (12,2) (12,1) (12,0) expectedOutput.add(new Watermark(3)) expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", toTS(1001)), true))) + Row.of(1: JInt, 11L: JLong, 1: JInt, "aaa", 1001L: JLong), true))) expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", toTS(2002)), true))) + Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 2002L: JLong), true))) expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", toTS(2002)), true))) + Row.of(1: JInt, 12L: JLong, 1: JInt, "aaa", 2002L: JLong), true))) expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 13L: JLong, 2: JInt, "aaa", toTS(2002)), true))) + Row.of(1: JInt, 13L: JLong, 2: JInt, "aaa", 2002L: JLong), true))) expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", toTS(2002)), true))) + Row.of(1: JInt, 14L: JLong, 0: JInt, "aaa", 2002L: JLong), true))) expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", toTS(2004)), true))) + Row.of(1: JInt, 12L: JLong, 3: JInt, "aaa", 2004L: JLong), true))) expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", toTS(2006)), true))) + Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 2006L: JLong), true))) expectedOutput.add(new Watermark(2007)) expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 19L: JLong, 0: JInt, "aaa", toTS(2008)), true))) + Row.of(1: JInt, 19L: JLong, 0: JInt, "aaa", 2008L: JLong), true))) expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 20L: JLong, 2: JInt, "aaa", toTS(2008)), true))) + Row.of(1: JInt, 20L: JLong, 2: JInt, "aaa", 2008L: JLong), true))) expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 20L: JLong, 1: JInt, "aaa", toTS(2008)), true))) + Row.of(1: JInt, 20L: JLong, 1: JInt, "aaa", 2008L: JLong), true))) expectedOutput.add(new StreamRecord(new CRow( - Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", toTS(2010)), true))) + Row.of(1: JInt, 10L: JLong, 0: JInt, "aaa", 2010L: JLong), true))) expectedOutput.add(new Watermark(2012)) http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala index 4c478de..24d8695 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/TimeAttributesITCase.scala @@ -63,7 +63,8 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { val stream = env .fromCollection(data) .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) - val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) + val table = stream.toTable( + tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'proctime.proctime) val t = table.select('rowtime.cast(Types.STRING)) @@ -123,6 +124,13 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'proctime.proctime) val func = new TableFunc + // we test if this can be executed with any exceptions + table.join(func('proctime, 'proctime, 'string) as 's).toAppendStream[Row] + + // we test if this can be executed with any exceptions + table.join(func('rowtime, 'rowtime, 'string) as 's).toAppendStream[Row] + + // we can only test rowtime, not proctime val t = table.join(func('rowtime, 'proctime, 'string) as 's).select('rowtime, 's) val results = t.toAppendStream[Row] http://git-wip-us.apache.org/repos/asf/flink/blob/47944b1b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala index 82ed81c..830359f 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/TableSinkITCase.scala @@ -421,8 +421,8 @@ class TableSinkITCase extends StreamingMultipleProgramsTestBase { ctx: ProcessFunction[(Boolean, Row), Row]#Context, out: Collector[Row]): Unit = { - val rowTS: Long = row._2.getField(2).asInstanceOf[Long] - if (ctx.timestamp() == rowTS) { + val rowTs = row._2.getField(2).asInstanceOf[Long] + if (ctx.timestamp() == rowTs) { out.collect(row._2) } }