dawidwys commented on a change in pull request #15742: URL: https://github.com/apache/flink/pull/15742#discussion_r620157830
########## File path: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala ########## @@ -447,17 +447,17 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) { } def sideOutputLateData(lateDataOutputTag: OutputTag[T]): PatternStream[T] = { - jPatternStream.sideOutputLateData(lateDataOutputTag) + jPatternStream = jPatternStream.sideOutputLateData(lateDataOutputTag) Review comment: I'd rather suggest creating a new scala `PatternStream`. Having a mutable `var` does not look good. ########## File path: flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/CEPScalaApiPatternStreamTest.scala ########## @@ -0,0 +1,67 @@ +package org.apache.flink.cep.scala + +import java.lang.reflect.Field + +import org.apache.flink.cep +import org.apache.flink.cep.pattern.Pattern +import org.apache.flink.cep.pattern.conditions.SimpleCondition +import org.apache.flink.streaming.api.datastream.DataStreamSource +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.junit.Assert.assertEquals +import org.junit.Test + +class CEPScalaApiPatternStreamTest { + /** + * These tests simply check that use the Scala API to update the TimeCharacteristic of the PatternStream . + */ + + @Test + def updateCepTimeCharacteristicByScalaApi(): Unit = { + + val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment + val input: DataStreamSource[Event] = env.fromElements(Event(1, "barfoo", 1.0), Event(8, "end", 1.0)) + val pattern: Pattern[Event, Event] = Pattern.begin("start").where(new SimpleCondition[Event]() { + override def filter(value: Event): Boolean = value.name == "start" + }) + + val jestream: cep.PatternStream[Event] = org.apache.flink.cep.CEP.pattern(input, pattern) + + //get org.apache.flink.cep.scala.PatternStream + val sePstream = new PatternStream[Event](jestream) + + //get TimeBehaviour + val time1: AnyRef = getTimeBehaviourFromScalaPatternStream(sePstream) + + assertEquals(time1.toString, "EventTime") + + //change TimeCharacteristic use scala api + val sPstream: PatternStream[Event] = sePstream.inProcessingTime() + + //get TimeBehaviour + val time2: AnyRef = getTimeBehaviourFromScalaPatternStream(sPstream) + + assertEquals(time2.toString, "ProcessingTime") + + + } + + def getTimeBehaviourFromScalaPatternStream(seStream: org.apache.flink.cep.scala.PatternStream[Event]) = { + val field: Field = seStream.getClass.getDeclaredField("jPatternStream") + field.setAccessible(true) + val JPattern: AnyRef = field.get(seStream) + val stream: cep.PatternStream[Event] = JPattern.asInstanceOf[cep.PatternStream[Event]] + getTimeBehaviourFromJavaPatternStream(stream) + } + + def getTimeBehaviourFromJavaPatternStream(jeStream: org.apache.flink.cep.PatternStream[Event])={ + val builder: Field = jeStream.getClass.getDeclaredField("builder") + builder.setAccessible(true) + val o: AnyRef = builder.get(jeStream) + val timeBehaviour: Field = o.getClass.getDeclaredField("timeBehaviour") + timeBehaviour.setAccessible(true) Review comment: That's a terrible way to test any change. Reflective access should be really last resort and it's also an indication of a bigger problem. It is totally susceptible to any changes to the underlying classes. It breaks all encapsulation efforts. I am sorry if I am harsh, but please never test any code in that way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org