Hi anna, shyla

When we call setStreamTimeCharacteristic(env.setStreamTimeCharacteristic),
it means sets the time characteristic for all streams create from this
environment. So if your application contains multi environments, then yes.

Best, Hequn

On Mon, Aug 6, 2018 at 9:37 AM, shyla deshpande <deshpandesh...@gmail.com>
wrote:

> Hi Hequn,
>
> I now realize that in Production, data will not be a problem since this
> will be a high volume kafka topic.
> So, I will go with EventTime.
>
> Still, I would like to know if
>
> I can use both TimeCharacteristic.ProcessingTime  and 
> TimeCharacteristic.EventTime in an application.
>
> *Thanks, the link you provided saved my time.*
>
> *-shyla*
>
>
>
>
>
> On Sun, Aug 5, 2018 at 9:28 AM, anna stax <annasta...@gmail.com> wrote:
>
>> Hi Hequn,
>>
>> Thanks for link. Looks like I better use ProcessingTime instead of
>> EventTime especially because of the 4th reason you listed..
>> "Data should cover a longer time span than the window size to advance the
>> event time."
>> I need the trigger when the data stops.
>>
>> I have 1 more question.
>>
>> Can I set the TimeCharacteristic to the stream level instead on the 
>> application level?
>> Can I use both TimeCharacteristic.ProcessingTime  and 
>> TimeCharacteristic.EventTime in an application.
>>
>> Thank you
>>
>> On Sat, Aug 4, 2018 at 10:05 PM, Hequn Cheng <chenghe...@gmail.com>
>> wrote:
>>
>>> Hi shyla,
>>>
>>> I answered a similar question on stackoverflow[1], you can take a look
>>> first.
>>>
>>> Best, Hequn
>>>
>>> [1] https://stackoverflow.com/questions/51691269/event-time-
>>> window-in-flink-does-not-trigger
>>>
>>> On Sun, Aug 5, 2018 at 11:24 AM, shyla deshpande <
>>> deshpandesh...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I used PopularPlacesFromKafka from dataartisans.flinktraining.exercises as 
>>>> the basis. I made very minor changes
>>>>
>>>> and the session window is not triggered. If I use ProcessingTime instead 
>>>> of EventTime it works. Here is my code.
>>>>
>>>> Appreciate any help. Thanks
>>>>
>>>> object KafkaEventTimeWindow {
>>>>
>>>>   private val LOCAL_ZOOKEEPER_HOST = "localhost:2181"
>>>>   private val LOCAL_KAFKA_BROKER = "localhost:9092"
>>>>   private val CON_GROUP = "KafkaEventTimeSessionWindow"
>>>>   private val MAX_EVENT_DELAY = 60 // events are out of order by max 60 
>>>> seconds
>>>>
>>>>   def main(args: Array[String]) {
>>>>
>>>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>
>>>>     val kafkaProps = new Properties
>>>>     kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST)
>>>>     kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER)
>>>>     kafkaProps.setProperty("group.id", CON_GROUP)
>>>>     kafkaProps.setProperty("auto.offset.reset", "earliest")
>>>>
>>>>     val consumer = new FlinkKafkaConsumer011[PositionEventProto](
>>>>       "positionevent",
>>>>       new PositionEventProtoSchema,
>>>>       kafkaProps)
>>>>     consumer.assignTimestampsAndWatermarks(new 
>>>> PositionEventProtoTSAssigner)
>>>>
>>>>     val posstream = env.addSource(consumer)
>>>>
>>>>     def convtoepochmilli(cdt: String): Long = {
>>>>       val  odt:OffsetDateTime = OffsetDateTime.parse(cdt);
>>>>       val i:Instant  = odt.toInstant();
>>>>       val millis:Long = i.toEpochMilli();
>>>>       millis
>>>>     }
>>>>
>>>>     val outputstream = posstream
>>>>       .mapWith{case(p) => (p.getConsumerUserId, 
>>>> convtoepochmilli(p.getCreateDateTime.getInIso8601Format))}
>>>>       .keyBy(0)
>>>>       .window(EventTimeSessionWindows.withGap(Time.seconds(60)))
>>>>       .reduce { (v1, v2) => (v1._1, Math.max(v1._2 , v2._2)) }
>>>>
>>>>     outputstream.print()
>>>>
>>>>     // execute the transformation pipeline
>>>>     env.execute("Output Stream")
>>>>   }
>>>>
>>>> }
>>>>
>>>> class PositionEventProtoTSAssigner
>>>>   extends 
>>>> BoundedOutOfOrdernessTimestampExtractor[PositionEventProto](Time.seconds(60))
>>>>  {
>>>>
>>>>   override def extractTimestamp(pos: PositionEventProto): Long = {
>>>>     val  odt:OffsetDateTime = 
>>>> OffsetDateTime.parse(pos.getCreateDateTime.getInIso8601Format);
>>>>     val i:Instant  = odt.toInstant();
>>>>     val millis:Long = i.toEpochMilli();
>>>>     millis
>>>>   }
>>>> }
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to