Hi.
I checked order of data. but it is alright.
Is there any other possibilities?
Thank you.

> On Aug 12, 2016, at 7:09 PM, Stephan Ewen <se...@apache.org> wrote:
> 
> Hi!
> 
> Its not that easy to say at a first glance.
> 
> One thing that is important to bear in mind is what ordering guarantees Flink 
> gives, and where the ordering guarantees are not given.
> When you use keyBy() or redistribute(), order is preserved per parallel 
> source/target pair only.
> 
> Have a look here:
> https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#parallel-dataflows
>  
> <https://ci.apache.org/projects/flink/flink-docs-master/concepts/concepts.html#parallel-dataflows>
> 
> 
> Could it be that the events simply arrive in a different order in the 
> functions, so that a later event that looks for state comes before an earlier 
> event that creates the state?
> 
> Greetings,
> Stephan
> 
> On Fri, Aug 12, 2016 at 12:04 PM, Dong-iL, Kim <kim.s...@gmail.com 
> <mailto:kim.s...@gmail.com>> wrote:
> Nope.
> I added log in End.
> but there is same log.
> is there any fault in my code?
> 
> thank you.
> 
> 
> > On Aug 12, 2016, at 6:42 PM, Maximilian Michels <m...@apache.org 
> > <mailto:m...@apache.org>> wrote:
> >
> > You're clearing the "handState" on "GameEndHistory". I'm assuming this
> > event comes in before "CommCardHistory" where you check the state.
> >
> > On Fri, Aug 12, 2016 at 6:59 AM, Dong-iL, Kim <kim.s...@gmail.com 
> > <mailto:kim.s...@gmail.com>> wrote:
> >> in my code, is the config of ExecutionEnv alright?
> >>
> >>
> >>> On Aug 11, 2016, at 8:47 PM, Dong-iL, Kim <kim.s...@gmail.com 
> >>> <mailto:kim.s...@gmail.com>> wrote:
> >>>
> >>>
> >>> my code and log is as below.
> >>>
> >>>
> >>>   val getExecuteEnv: StreamExecutionEnvironment = {
> >>>       val env = 
> >>> StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing(10000)
> >>>       env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
> >>>       
> >>> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE)
> >>>       env.getCheckpointConfig.setCheckpointTimeout(60000)
> >>>       env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
> >>>       env.setRestartStrategy(RestartStrategies.fixedDelayRestart(30, 
> >>> 30000))
> >>>       env
> >>>   }
> >>>
> >>> def transform(target: DataStream[(String, String, String, String, 
> >>> Long)]): DataStream[WinLossBase] =
> >>>       target.keyBy(_._3).flatMap(new StateOperator)
> >>>
> >>> def main(args: Array[String]) {
> >>>       val env = getExecuteEnv
> >>>       val source: DataStream[String] = 
> >>> extractFromKafka(env).name("KafkaSource")
> >>>       val json = deserializeToJsonObj(source).name("ConvertToJson")
> >>>       val target: DataStream[(String, String, String, String, Long)] = 
> >>> preTransform(json)
> >>>       val result: DataStream[WinLossBase] = 
> >>> transform(target).name("ToKeyedStream”)
> >>> …
> >>> }
> >>>
> >>> class StateOperator extends RichFlatMapFunction[(String, String, String, 
> >>> String, Long), WinLossBase] {
> >>>       var playerState: ValueState[util.Map[String, PotPlayer]] = _
> >>>       var handState: ValueState[HandHistoryInfo] = _
> >>>
> >>>       override def open(param: Configuration): Unit = {
> >>>           val playerValueStateDescriptor = new 
> >>> ValueStateDescriptor[util.Map[String, PotPlayer]]("winloss",
> >>>               classOf[util.Map[String, PotPlayer]], 
> >>> Maps.newHashMap[String, PotPlayer]())
> >>>           playerState = 
> >>> getRuntimeContext.getState(playerValueStateDescriptor)
> >>>           handState = getRuntimeContext.getState(new 
> >>> ValueStateDescriptor("handinfo", classOf[HandHistoryInfo], null))
> >>>       }
> >>>
> >>>       override def flatMap(in: (String, String, String, String, Long), 
> >>> out: Collector[WinLossBase]): Unit = {
> >>>           in._2 match {
> >>>               case "GameStartHistory" =>
> >>>                   val players = playerState.value()
> >>>                   val obj = _convertJsonToRecord(in._4, 
> >>> classOf[GameStartHistoryRecord])
> >>>                   val record = obj.asInstanceOf[GameStartHistoryRecord]
> >>>                   val handHistoryInfo: HandHistoryInfo = 
> >>> _setUpHandHistoryInfo(record)
> >>>                   if (LOG.isInfoEnabled())
> >>>                       LOG.info("hand start {}", if (handHistoryInfo != 
> >>> null) handHistoryInfo.handHistoryId else "NULL”)
> >>>                     ….
> >>>                   playerState.update(players)
> >>>                   handState.update(handHistoryInfo)
> >>>               case "HoleCardHistory" =>
> >>>                   val players = playerState.value()
> >>>                   if (players != null) {
> >>>                      ...
> >>>                        playerState.update(players)
> >>>                   } else LOG.warn("there is no player[hole card]. {}", 
> >>> in._4)
> >>>               case "PlayerStateHistory" =>
> >>>                   val players = playerState.value()
> >>>                   if (players != null) {
> >>>                      ….
> >>>                       playerState.update(players)
> >>>                   } else LOG.warn("there is no player[player state]. {}", 
> >>> in._4)
> >>>               case "CommCardHistory" =>
> >>>                   val handHistoryInfo = handState.value()
> >>>                   val commCardHistory: CommCardHistory = 
> >>> commCardState.value()
> >>>                   if (handHistoryInfo != null) {
> >>>                      ...
> >>>                       handState.update(handHistoryInfo)
> >>>                       commCardState.update(commCardHistory)
> >>>                   } else LOG.warn("there is no handhistory info[comm 
> >>> card]. {}", in._4)
> >>>               case "PlayerActionHistory" =>
> >>>                   val handHistoryInfo = handState.value()
> >>>                   val players = playerState.value()
> >>>
> >>>                   if (handHistoryInfo != null) {
> >>>                      ...
> >>>                   } else LOG.warn("there is no handhistory info[player 
> >>> action]. {}", in._4)
> >>>               case "PotHistory" =>
> >>>                   val players = playerState.value()
> >>>                   val handHistoryInfo = handState.value()
> >>>                   val commCardHistory: CommCardHistory = 
> >>> commCardState.value()
> >>>                   if (handHistoryInfo != null && handHistoryInfo.playType 
> >>> == PlayType.Cash && players != null && players.size > 1) {
> >>>                       ...
> >>>                   } else LOG.warn("there is no handhistory info[pot]. 
> >>> {}", in._4)
> >>>               case "GameEndHistory" =>
> >>>                   val players = playerState.value()
> >>>                   val handHistoryInfo = handState.value()
> >>>                      ...
> >>>                   if (LOG.isTraceEnabled()) LOG.trace("end {}", 
> >>> record.getHandHistoryId)
> >>>                   playerState.clear()
> >>>                   handState.clear()
> >>>               case _ =>
> >>>           }
> >>>       }
> >>>
> >>> —— log ——
> >>> 2016-08-11 11:44:53.258 [ToKeyedStream -> (Map -> Sink: Pot to HBase, 
> >>> Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to 
> >>> HBase) (3/4)] INFO  com.nsuslab.denma.stream.winloss.flow.Main$  - hand 
> >>> start 5769392597641628595
> >>>
> >>> 2016-08-11 11:45:07.555 [ToKeyedStream -> (Map -> Sink: Pot to HBase, 
> >>> Map, Map -> Sink: winloss to HBase, Map -> Sink: winloss for opponent to 
> >>> HBase) (3/4)] WARN  com.nsuslab.denma.stream.winloss.flow.Main$  - there 
> >>> is no handhistory info[pot].
> >>>
> >>>> On Aug 11, 2016, at 7:01 PM, Ufuk Celebi <u...@apache.org 
> >>>> <mailto:u...@apache.org>> wrote:
> >>>>
> >>>> What do you mean with lost exactly?
> >>>>
> >>>> You call value() and it returns a value (!= null/defaultValue) and you
> >>>> call it again and it returns null/defaultValue for the same key with
> >>>> no update in between?
> >>>>
> >>>> On Thu, Aug 11, 2016 at 11:59 AM, Kostas Kloudas
> >>>> <k.klou...@data-artisans.com <mailto:k.klou...@data-artisans.com>> wrote:
> >>>>> Hello,
> >>>>>
> >>>>> Could you share the code of the job you are running?
> >>>>> With only this information I am afraid we cannot help much.
> >>>>>
> >>>>> Thanks,
> >>>>> Kostas
> >>>>>
> >>>>>> On Aug 11, 2016, at 11:55 AM, Dong-iL, Kim <kim.s...@gmail.com 
> >>>>>> <mailto:kim.s...@gmail.com>> wrote:
> >>>>>>
> >>>>>> Hi.
> >>>>>> I’m using flink 1.0.3 on aws EMR.
> >>>>>> sporadically value of ValueState is lost.
> >>>>>> what is starting point for solving this problem.
> >>>>>> Thank you.
> >>>>>
> >>>
> >>
> 
> 

Reply via email to