HA setting for per-job YARN session
Hi, i am using the " per-job YARN session " mode deploy flink job on yarn. The document https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/jobmanager_high_availability.html#yarn-cluster-high-availability says that "we don’t run multiple JobManager (ApplicationMaster) instances" but still give an example about how to config an Highly Available YARN Session cluster. My question is: does the " per-job YARN session " mode should also use HA setting ? Because in my test case without using zookeeper HA , if the YarnApplicationMasterRunner/Jobmanager is failed or killed ,yarn can only restart the YarnApplicationMasterRunner/Jobmanager program but no job graph is submitted to the Jobmanager again. THANKS
Re: Event Time Session Window does not trigger..
Hi anna, shyla When we call setStreamTimeCharacteristic(env.setStreamTimeCharacteristic), it means sets the time characteristic for all streams create from this environment. So if your application contains multi environments, then yes. Best, Hequn On Mon, Aug 6, 2018 at 9:37 AM, shyla deshpande wrote: > Hi Hequn, > > I now realize that in Production, data will not be a problem since this > will be a high volume kafka topic. > So, I will go with EventTime. > > Still, I would like to know if > > I can use both TimeCharacteristic.ProcessingTime and > TimeCharacteristic.EventTime in an application. > > *Thanks, the link you provided saved my time.* > > *-shyla* > > > > > > On Sun, Aug 5, 2018 at 9:28 AM, anna stax wrote: > >> Hi Hequn, >> >> Thanks for link. Looks like I better use ProcessingTime instead of >> EventTime especially because of the 4th reason you listed.. >> "Data should cover a longer time span than the window size to advance the >> event time." >> I need the trigger when the data stops. >> >> I have 1 more question. >> >> Can I set the TimeCharacteristic to the stream level instead on the >> application level? >> Can I use both TimeCharacteristic.ProcessingTime and >> TimeCharacteristic.EventTime in an application. >> >> Thank you >> >> On Sat, Aug 4, 2018 at 10:05 PM, Hequn Cheng >> wrote: >> >>> Hi shyla, >>> >>> I answered a similar question on stackoverflow[1], you can take a look >>> first. >>> >>> Best, Hequn >>> >>> [1] https://stackoverflow.com/questions/51691269/event-time- >>> window-in-flink-does-not-trigger >>> >>> On Sun, Aug 5, 2018 at 11:24 AM, shyla deshpande < >>> deshpandesh...@gmail.com> wrote: >>> Hi, I used PopularPlacesFromKafka from dataartisans.flinktraining.exercises as the basis. I made very minor changes and the session window is not triggered. If I use ProcessingTime instead of EventTime it works. Here is my code. Appreciate any help. Thanks object KafkaEventTimeWindow { private val LOCAL_ZOOKEEPER_HOST = "localhost:2181" private val LOCAL_KAFKA_BROKER = "localhost:9092" private val CON_GROUP = "KafkaEventTimeSessionWindow" private val MAX_EVENT_DELAY = 60 // events are out of order by max 60 seconds def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val kafkaProps = new Properties kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST) kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER) kafkaProps.setProperty("group.id", CON_GROUP) kafkaProps.setProperty("auto.offset.reset", "earliest") val consumer = new FlinkKafkaConsumer011[PositionEventProto]( "positionevent", new PositionEventProtoSchema, kafkaProps) consumer.assignTimestampsAndWatermarks(new PositionEventProtoTSAssigner) val posstream = env.addSource(consumer) def convtoepochmilli(cdt: String): Long = { val odt:OffsetDateTime = OffsetDateTime.parse(cdt); val i:Instant = odt.toInstant(); val millis:Long = i.toEpochMilli(); millis } val outputstream = posstream .mapWith{case(p) => (p.getConsumerUserId, convtoepochmilli(p.getCreateDateTime.getInIso8601Format))} .keyBy(0) .window(EventTimeSessionWindows.withGap(Time.seconds(60))) .reduce { (v1, v2) => (v1._1, Math.max(v1._2 , v2._2)) } outputstream.print() // execute the transformation pipeline env.execute("Output Stream") } } class PositionEventProtoTSAssigner extends BoundedOutOfOrdernessTimestampExtractor[PositionEventProto](Time.seconds(60)) { override def extractTimestamp(pos: PositionEventProto): Long = { val odt:OffsetDateTime = OffsetDateTime.parse(pos.getCreateDateTime.getInIso8601Format); val i:Instant = odt.toInstant(); val millis:Long = i.toEpochMilli(); millis } } >>> >> >
Re: Event Time Session Window does not trigger..
Hi Hequn, I now realize that in Production, data will not be a problem since this will be a high volume kafka topic. So, I will go with EventTime. Still, I would like to know if I can use both TimeCharacteristic.ProcessingTime and TimeCharacteristic.EventTime in an application. *Thanks, the link you provided saved my time.* *-shyla* On Sun, Aug 5, 2018 at 9:28 AM, anna stax wrote: > Hi Hequn, > > Thanks for link. Looks like I better use ProcessingTime instead of > EventTime especially because of the 4th reason you listed.. > "Data should cover a longer time span than the window size to advance the > event time." > I need the trigger when the data stops. > > I have 1 more question. > > Can I set the TimeCharacteristic to the stream level instead on the > application level? > Can I use both TimeCharacteristic.ProcessingTime and > TimeCharacteristic.EventTime in an application. > > Thank you > > On Sat, Aug 4, 2018 at 10:05 PM, Hequn Cheng wrote: > >> Hi shyla, >> >> I answered a similar question on stackoverflow[1], you can take a look >> first. >> >> Best, Hequn >> >> [1] https://stackoverflow.com/questions/51691269/event-time- >> window-in-flink-does-not-trigger >> >> On Sun, Aug 5, 2018 at 11:24 AM, shyla deshpande < >> deshpandesh...@gmail.com> wrote: >> >>> Hi, >>> >>> I used PopularPlacesFromKafka from dataartisans.flinktraining.exercises as >>> the basis. I made very minor changes >>> >>> and the session window is not triggered. If I use ProcessingTime instead of >>> EventTime it works. Here is my code. >>> >>> Appreciate any help. Thanks >>> >>> object KafkaEventTimeWindow { >>> >>> private val LOCAL_ZOOKEEPER_HOST = "localhost:2181" >>> private val LOCAL_KAFKA_BROKER = "localhost:9092" >>> private val CON_GROUP = "KafkaEventTimeSessionWindow" >>> private val MAX_EVENT_DELAY = 60 // events are out of order by max 60 >>> seconds >>> >>> def main(args: Array[String]) { >>> >>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >>> >>> val kafkaProps = new Properties >>> kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST) >>> kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER) >>> kafkaProps.setProperty("group.id", CON_GROUP) >>> kafkaProps.setProperty("auto.offset.reset", "earliest") >>> >>> val consumer = new FlinkKafkaConsumer011[PositionEventProto]( >>> "positionevent", >>> new PositionEventProtoSchema, >>> kafkaProps) >>> consumer.assignTimestampsAndWatermarks(new PositionEventProtoTSAssigner) >>> >>> val posstream = env.addSource(consumer) >>> >>> def convtoepochmilli(cdt: String): Long = { >>> val odt:OffsetDateTime = OffsetDateTime.parse(cdt); >>> val i:Instant = odt.toInstant(); >>> val millis:Long = i.toEpochMilli(); >>> millis >>> } >>> >>> val outputstream = posstream >>> .mapWith{case(p) => (p.getConsumerUserId, >>> convtoepochmilli(p.getCreateDateTime.getInIso8601Format))} >>> .keyBy(0) >>> .window(EventTimeSessionWindows.withGap(Time.seconds(60))) >>> .reduce { (v1, v2) => (v1._1, Math.max(v1._2 , v2._2)) } >>> >>> outputstream.print() >>> >>> // execute the transformation pipeline >>> env.execute("Output Stream") >>> } >>> >>> } >>> >>> class PositionEventProtoTSAssigner >>> extends >>> BoundedOutOfOrdernessTimestampExtractor[PositionEventProto](Time.seconds(60)) >>> { >>> >>> override def extractTimestamp(pos: PositionEventProto): Long = { >>> val odt:OffsetDateTime = >>> OffsetDateTime.parse(pos.getCreateDateTime.getInIso8601Format); >>> val i:Instant = odt.toInstant(); >>> val millis:Long = i.toEpochMilli(); >>> millis >>> } >>> } >>> >>> >>> >> >
Flink Forwards 2018 videos
It appears the Flink Forwards 2018 videos are FUBAR. The data entry form refuses to show them regardless of what you enter in it.
Re: Event Time Session Window does not trigger..
Hi Hequn, Thanks for link. Looks like I better use ProcessingTime instead of EventTime especially because of the 4th reason you listed.. "Data should cover a longer time span than the window size to advance the event time." I need the trigger when the data stops. I have 1 more question. Can I set the TimeCharacteristic to the stream level instead on the application level? Can I use both TimeCharacteristic.ProcessingTime and TimeCharacteristic.EventTime in an application. Thank you On Sat, Aug 4, 2018 at 10:05 PM, Hequn Cheng wrote: > Hi shyla, > > I answered a similar question on stackoverflow[1], you can take a look > first. > > Best, Hequn > > [1] https://stackoverflow.com/questions/51691269/event-time- > window-in-flink-does-not-trigger > > On Sun, Aug 5, 2018 at 11:24 AM, shyla deshpande > wrote: > >> Hi, >> >> I used PopularPlacesFromKafka from dataartisans.flinktraining.exercises as >> the basis. I made very minor changes >> >> and the session window is not triggered. If I use ProcessingTime instead of >> EventTime it works. Here is my code. >> >> Appreciate any help. Thanks >> >> object KafkaEventTimeWindow { >> >> private val LOCAL_ZOOKEEPER_HOST = "localhost:2181" >> private val LOCAL_KAFKA_BROKER = "localhost:9092" >> private val CON_GROUP = "KafkaEventTimeSessionWindow" >> private val MAX_EVENT_DELAY = 60 // events are out of order by max 60 >> seconds >> >> def main(args: Array[String]) { >> >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >> >> val kafkaProps = new Properties >> kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST) >> kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER) >> kafkaProps.setProperty("group.id", CON_GROUP) >> kafkaProps.setProperty("auto.offset.reset", "earliest") >> >> val consumer = new FlinkKafkaConsumer011[PositionEventProto]( >> "positionevent", >> new PositionEventProtoSchema, >> kafkaProps) >> consumer.assignTimestampsAndWatermarks(new PositionEventProtoTSAssigner) >> >> val posstream = env.addSource(consumer) >> >> def convtoepochmilli(cdt: String): Long = { >> val odt:OffsetDateTime = OffsetDateTime.parse(cdt); >> val i:Instant = odt.toInstant(); >> val millis:Long = i.toEpochMilli(); >> millis >> } >> >> val outputstream = posstream >> .mapWith{case(p) => (p.getConsumerUserId, >> convtoepochmilli(p.getCreateDateTime.getInIso8601Format))} >> .keyBy(0) >> .window(EventTimeSessionWindows.withGap(Time.seconds(60))) >> .reduce { (v1, v2) => (v1._1, Math.max(v1._2 , v2._2)) } >> >> outputstream.print() >> >> // execute the transformation pipeline >> env.execute("Output Stream") >> } >> >> } >> >> class PositionEventProtoTSAssigner >> extends >> BoundedOutOfOrdernessTimestampExtractor[PositionEventProto](Time.seconds(60)) >> { >> >> override def extractTimestamp(pos: PositionEventProto): Long = { >> val odt:OffsetDateTime = >> OffsetDateTime.parse(pos.getCreateDateTime.getInIso8601Format); >> val i:Instant = odt.toInstant(); >> val millis:Long = i.toEpochMilli(); >> millis >> } >> } >> >> >> >