Hi anna, shyla When we call setStreamTimeCharacteristic(env.setStreamTimeCharacteristic), it means sets the time characteristic for all streams create from this environment. So if your application contains multi environments, then yes.
Best, Hequn On Mon, Aug 6, 2018 at 9:37 AM, shyla deshpande <deshpandesh...@gmail.com> wrote: > Hi Hequn, > > I now realize that in Production, data will not be a problem since this > will be a high volume kafka topic. > So, I will go with EventTime. > > Still, I would like to know if > > I can use both TimeCharacteristic.ProcessingTime and > TimeCharacteristic.EventTime in an application. > > *Thanks, the link you provided saved my time.* > > *-shyla* > > > > > > On Sun, Aug 5, 2018 at 9:28 AM, anna stax <annasta...@gmail.com> wrote: > >> Hi Hequn, >> >> Thanks for link. Looks like I better use ProcessingTime instead of >> EventTime especially because of the 4th reason you listed.. >> "Data should cover a longer time span than the window size to advance the >> event time." >> I need the trigger when the data stops. >> >> I have 1 more question. >> >> Can I set the TimeCharacteristic to the stream level instead on the >> application level? >> Can I use both TimeCharacteristic.ProcessingTime and >> TimeCharacteristic.EventTime in an application. >> >> Thank you >> >> On Sat, Aug 4, 2018 at 10:05 PM, Hequn Cheng <chenghe...@gmail.com> >> wrote: >> >>> Hi shyla, >>> >>> I answered a similar question on stackoverflow[1], you can take a look >>> first. >>> >>> Best, Hequn >>> >>> [1] https://stackoverflow.com/questions/51691269/event-time- >>> window-in-flink-does-not-trigger >>> >>> On Sun, Aug 5, 2018 at 11:24 AM, shyla deshpande < >>> deshpandesh...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I used PopularPlacesFromKafka from dataartisans.flinktraining.exercises as >>>> the basis. I made very minor changes >>>> >>>> and the session window is not triggered. If I use ProcessingTime instead >>>> of EventTime it works. Here is my code. >>>> >>>> Appreciate any help. Thanks >>>> >>>> object KafkaEventTimeWindow { >>>> >>>> private val LOCAL_ZOOKEEPER_HOST = "localhost:2181" >>>> private val LOCAL_KAFKA_BROKER = "localhost:9092" >>>> private val CON_GROUP = "KafkaEventTimeSessionWindow" >>>> private val MAX_EVENT_DELAY = 60 // events are out of order by max 60 >>>> seconds >>>> >>>> def main(args: Array[String]) { >>>> >>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >>>> >>>> val kafkaProps = new Properties >>>> kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST) >>>> kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER) >>>> kafkaProps.setProperty("group.id", CON_GROUP) >>>> kafkaProps.setProperty("auto.offset.reset", "earliest") >>>> >>>> val consumer = new FlinkKafkaConsumer011[PositionEventProto]( >>>> "positionevent", >>>> new PositionEventProtoSchema, >>>> kafkaProps) >>>> consumer.assignTimestampsAndWatermarks(new >>>> PositionEventProtoTSAssigner) >>>> >>>> val posstream = env.addSource(consumer) >>>> >>>> def convtoepochmilli(cdt: String): Long = { >>>> val odt:OffsetDateTime = OffsetDateTime.parse(cdt); >>>> val i:Instant = odt.toInstant(); >>>> val millis:Long = i.toEpochMilli(); >>>> millis >>>> } >>>> >>>> val outputstream = posstream >>>> .mapWith{case(p) => (p.getConsumerUserId, >>>> convtoepochmilli(p.getCreateDateTime.getInIso8601Format))} >>>> .keyBy(0) >>>> .window(EventTimeSessionWindows.withGap(Time.seconds(60))) >>>> .reduce { (v1, v2) => (v1._1, Math.max(v1._2 , v2._2)) } >>>> >>>> outputstream.print() >>>> >>>> // execute the transformation pipeline >>>> env.execute("Output Stream") >>>> } >>>> >>>> } >>>> >>>> class PositionEventProtoTSAssigner >>>> extends >>>> BoundedOutOfOrdernessTimestampExtractor[PositionEventProto](Time.seconds(60)) >>>> { >>>> >>>> override def extractTimestamp(pos: PositionEventProto): Long = { >>>> val odt:OffsetDateTime = >>>> OffsetDateTime.parse(pos.getCreateDateTime.getInIso8601Format); >>>> val i:Instant = odt.toInstant(); >>>> val millis:Long = i.toEpochMilli(); >>>> millis >>>> } >>>> } >>>> >>>> >>>> >>> >> >