HA HDFS

2019-02-06 Thread Steven Nelson
I am working on a POC High Availability installation of Flink on top of
Kubernetes with HDFS as a data storage location. I am not finding much
documentation on doing this, or I am finding the documentation in parts and
maybe getting it put together correctly. I think it falls between being an
HDFS thing and a Flink thing.

I am deploying to Kubernetes using the flink:1.7.0-hadoop27-scala_2.11
container off of docker hub.

I think these are the things I need to do
1) Setup an hdfs-site.xml file per
https://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-hdfs/HDFSHighAvailabilityWithQJM.html#Deployment
2) Set the HADOOP_CONF_DIR environment variable to the location of that
file per
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#hdfs
3) Create a flink-conf.yaml file that looks something like
fs.default-scheme: hdfs://
state.backend: rocksdb
state.savepoints.dir: hdfs://flink/savepoints
state.checkpoints.dir: hdfs://flink/checkpoints
4) Dance a little jig when it works.

Has anyone set this up? If so, am I missing anything?

-Steve


Re: Using custom evictor and trigger on Table API

2019-02-06 Thread Rong Rong
Hi Dongwon,

There was a previous thread regarding this[1], unfortunately this is not
supported yet.

However there are some latest development proposal[2,3] to enhance the
TableAPI which might be able to support your use case.

--
Rong

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Table-API-custom-window-td22199.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Enhancing-the-functionality-and-productivity-of-Table-API-td24963i20.html

On Wed, Feb 6, 2019 at 4:14 AM eastcirclek  wrote:

> Hi all,
>
> I’m looking into Table API for my new project.
> It looks like a sweet spot between DataStream API/SQL.
>
> However, it doesn’t seem like the expressivity of Table API equals to that
> of DataStream API.
>
> My previous Flink projects were building simple pipelines using DataStream
> API with custom evictor (FF Berlin 17) and custom trigger (FF Berlin 18).
> I believe these pipelines can be expressed with Table API/UDF/UDAF except
> the custom windowing components.
>
> Do I have no way but to change the table into DataStream to use the custom
> components at the moment?
>
> Best,
> - Dongwon
>
>


Re: Get nested Rows from Json string

2019-02-06 Thread Rong Rong
Hi François,

I wasn't exactly sure this is a JSON object or JSON string you are trying
to process.
For a JSON string this [1] article might help.
For a JSON object, I am assuming you are trying to convert it into a
TableSource and processing using Table/SQL API, you could probably use the
example here [2]

BTW, a very remote hunch, this might be just a stringify issue how you
print the row out.

--
Rong

[1]:
https://stackoverflow.com/questions/49380778/how-to-stream-a-json-using-flink
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/sourceSinks.html#table-sources-sinks

On Wed, Feb 6, 2019 at 3:06 AM françois lacombe <
francois.laco...@dcbrain.com> wrote:

> Hi all,
>
> I currently get a json string from my pgsql source with nested objects to
> be converted into Flink's Row.
> Nested json objects should go in nested Rows.
> An avro schema rules the structure my source should conform to.
>
> According to this json :
> {
>   "a":"b",
>   "c":"d",
>   "e":{
>"f":"g"
>}
> }
>
> ("b", "d", Row("g")) is expected as a result according to my avro schema.
>
> I wrote a recursive method which iterate over json objects and put nested
> Rows at right indices in their parent but here is what outputs : ("b", "d",
> "g")
> Child Row is appended to the parent. I don't understand why.
> Obviously, process is crashing arguing the top level Row arity doesn't
> match serializers.
>
> Is there some native methods in Flink to achieve that?
> I don't feel so comfortable to have written my own json processor for this
> job.
>
> Do you have any hint which can help please ?
>
> All the best
>
> François
>
>
>
>    
> 
> 
>
> [image: Arbre vert.jpg] Pensez à la planète, imprimer ce papier que si
> nécessaire
>


Re: AssertionError: mismatched type $5 TIMESTAMP(3)

2019-02-06 Thread Chris Miller

Hi Timo,

Thanks for the pointers, bug reports and slides, much appreciated. I'll 
read up to get a better understanding of the issue and hopefully figure 
out a more appropriate solution for what I'm trying achieve. I'll report 
back if I come up with anything that others might find useful.


Regards,
Chris


-- Original Message --
From: "Timo Walther" 
To: user@flink.apache.org
Sent: 06/02/2019 16:45:26
Subject: Re: AssertionError: mismatched type $5 TIMESTAMP(3)


Hi Chris,

the error that you've observed is a bug that might be related to another bug 
that is not easily solvable.

I created an issue for it nevertheless: 
https://issues.apache.org/jira/browse/FLINK-11543

In general, I think you need to adapt your program in any case. Because you are 
aggregating on a rowtime attribute, it will loose its time attribute property 
and becomes a regular timestamp. Thus, you can't use it for a temporal table 
join.

Maybe the following training from the last FlinkForward conference might help you. I 
would recommend the slide set there to understand the different between streaming 
operations and what we call "materializing" operations:

https://github.com/dataArtisans/sql-training/wiki/SQL-on-streams

I hope this helps. Feel free to ask further questions.

Regards,
Timo

Am 05.02.19 um 11:30 schrieb Chris Miller:

Exception in thread "main" java.lang.AssertionError: mismatched type $5 
TIMESTAMP(3)
at 
org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2481)
at 
org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2459)
at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:151)
at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:100)
at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:34)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:279)
at org.apache.calcite.rex.RexShuttle.mutate(RexShuttle.java:241)
at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:259)
at org.apache.calcite.rex.RexUtil.fixUp(RexUtil.java:1605)
at 
org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:230)
at 
org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:344)
at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
at 
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:374)
at 
org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
at 
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
at 
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:340)
at 
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:272)
at test.Test.main(Test.java:78)







Re: Exactly Once Guarantees with StreamingFileSink to S3

2019-02-06 Thread Kaustubh Rudrawar
Hi Kostas,

Thanks for the response! Yes - I see the commitAfterRecovery being called
when a Bucket is restored. I confused myself in thinking that
'onSuccessfulCompletionOfCheckpoint' is called on restore as well, which
led me to believe that we were only calling commit and not
commitAfterRecovery.

Thanks for the clarification!
-Kaustubh

On Wed, Feb 6, 2019 at 2:16 AM Kostas Kloudas  wrote:

> Hi Kaustubh,
>
> Your general understanding is correct.
>
> In this case though, the sink will call the
> S3Committer#commitAfterRecovery() method.
> This method, after failing to commit the MPU, it will check if the file is
> there and if the length
> is correct, and if everything is ok (which is the case in your example),
> then it will
> continue to normal execution.
>
> I hope this helps.
>
> Kostas
>
> On Wed, Feb 6, 2019 at 7:47 AM Kaustubh Rudrawar  wrote:
>
>> Hi,
>>
>> I'm trying to understand the exactly once semantics of the
>> StreamingFileSink with S3 in Flink 1.7.1 and am a bit confused on how it
>> guarantees exactly once under a very specific failure scenario.
>>
>> For simplicity, lets say we will roll the current part file on checkpoint
>> (and only on checkpoint), the process is as follows:
>> 1. Framework tells the sink to prepare for a checkpoint. This ultimately
>> results in 'onReceptionOfCheckpoint' being called on Bucket.java.
>> 2. This takes the current file, and based on our roll policy of rolling
>> on checkpoint, it closes and uploads it to S3 as part of a MPU and the
>> reference to this upload is stored as part of 'pendingPartsPerCheckpoint'.
>> 3. Once the checkpoint successfully completes, the bucket is notified via
>> 'onSuccessfulCompletionOfCheckpoint'. At this point, the bucket goes
>> through all pendingPartsPerCheckpoint and for each of them: recovers the in
>> progress part (which doesn't exist in this scenario) and then commits the
>> upload.
>> 4. The AmazonS3Client is ultimately called to perform the upload and it
>> retries the attempt up to N times. If it exhausts retries, it will throw an
>> Exception.
>> 5. Upon successful commit of the MPU, Bucket clears out its references to
>> these uploads from its state.
>>
>> Given this flow, I'm having trouble understanding how the following
>> scenario works:
>>
>>- Step 4: The commit on the MPU succeeds,
>>- Step 5: Before this step completes, the task crashes. So at this
>>point, S3 has successfully completed the MPU but to the client (the
>>Flink job), it has not completed.
>>- Flink will then recover from the checkpoint we just took and steps
>>3 and 4 will be repeated. My understanding is that, since the MPU 
>> succeeded
>>previously, any attempts at re-committing that upload will result in a 404
>>('NoSuchUpload'). So Step 4 should throw an exception. Which would then 
>> get
>>retried by the framework and this process repeats itself.
>>
>> So how is this case handled?
>>
>> Really appreciate the help!
>> -Kaustubh
>>
>>
>>


Re: H-A Deployment : Job / task manager configuration

2019-02-06 Thread Stefan Richter
Hi,

You only need to do the configuration in conf/flink-conf.yaml on the job 
manager. The configuration will be shipped to the TMs.

Best,
Stefan 

> On 5. Feb 2019, at 16:59, bastien dine  wrote:
> 
> Hello everyone,
> 
> I would like to know what exactly I need to configure on my job / task 
> managers for an H-A deployment
> The document 
> (https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/jobmanager_high_availability.html
>  
> )
>  is not really fluent about this..
> The conf/masters need to be on job / tasks ? or only on taskmaangers to find 
> the job manager(s)
> If so, the conf/flink-conf.yaml of task manager need to be set to ha 
> zookeeper only on job manager ? Or on taskmanager too ?
> Just to know exactly where we need to configure things will help to know a 
> little more about interaction between job manager / task manager / zk
> 
> Best Regards,
> Bastien
> 
> --
> 
> Bastien DINE
> Data Architect / Software Engineer / Sysadmin
> bastiendine.io 



Re: AssertionError: mismatched type $5 TIMESTAMP(3)

2019-02-06 Thread Timo Walther

Hi Chris,

the error that you've observed is a bug that might be related to another 
bug that is not easily solvable.


I created an issue for it nevertheless: 
https://issues.apache.org/jira/browse/FLINK-11543


In general, I think you need to adapt your program in any case. Because 
you are aggregating on a rowtime attribute, it will loose its time 
attribute property and becomes a regular timestamp. Thus, you can't use 
it for a temporal table join.


Maybe the following training from the last FlinkForward conference might 
help you. I would recommend the slide set there to understand the 
different between streaming operations and what we call "materializing" 
operations:


https://github.com/dataArtisans/sql-training/wiki/SQL-on-streams

I hope this helps. Feel free to ask further questions.

Regards,
Timo

Am 05.02.19 um 11:30 schrieb Chris Miller:
Exception in thread "main" java.lang.AssertionError: mismatched type 
$5 TIMESTAMP(3)
    at 
org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2481)
    at 
org.apache.calcite.rex.RexUtil$FixNullabilityShuttle.visitInputRef(RexUtil.java:2459)

    at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
    at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:151)
    at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:100)
    at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:34)
    at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
    at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:279)
    at org.apache.calcite.rex.RexShuttle.mutate(RexShuttle.java:241)
    at org.apache.calcite.rex.RexShuttle.apply(RexShuttle.java:259)
    at org.apache.calcite.rex.RexUtil.fixUp(RexUtil.java:1605)
    at 
org.apache.calcite.rel.rules.FilterJoinRule.perform(FilterJoinRule.java:230)
    at 
org.apache.calcite.rel.rules.FilterJoinRule$FilterIntoJoinRule.onMatch(FilterJoinRule.java:344)
    at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:212)
    at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:646)
    at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:339)
    at 
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:374)
    at 
org.apache.flink.table.api.TableEnvironment.optimizeLogicalPlan(TableEnvironment.scala:292)
    at 
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:812)
    at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:860)
    at 
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:340)
    at 
org.apache.flink.table.api.java.StreamTableEnvironment.toRetractStream(StreamTableEnvironment.scala:272)
    at test.Test.main(Test.java:78) 





Re: JDBCAppendTableSink on Data stream

2019-02-06 Thread Stefan Richter
Hi,

That should be no problem, for example the `JDBCAppendTableSinkTest` is using 
it also with data stream.

Best,
Stefan

> On 6. Feb 2019, at 07:29, Chirag Dewan  wrote:
> 
> Hi,
> 
> In the documentation, the JDBC sink is mentioned as a source on Table 
> API/stream. 
> 
> Can I use the same sink with a Data stream as well?
> 
> My use case is to read the data from Kafka and send the data to Postgres.
> 
> I was also hoping to achieve Exactly-Once since these will mainly be 
> Idempotent writes on a table.
> 
> Any help much appreciated. 
> 
> Thanks,
> 
> Chirag



Using custom evictor and trigger on Table API

2019-02-06 Thread eastcirclek
Hi all,

I’m looking into Table API for my new project.
It looks like a sweet spot between DataStream API/SQL.

However, it doesn’t seem like the expressivity of Table API equals to that of 
DataStream API.

My previous Flink projects were building simple pipelines using DataStream API 
with custom evictor (FF Berlin 17) and custom trigger (FF Berlin 18).
I believe these pipelines can be expressed with Table API/UDF/UDAF except the 
custom windowing components.

Do I have no way but to change the table into DataStream to use the custom 
components at the moment?

Best,
- Dongwon



Get nested Rows from Json string

2019-02-06 Thread françois lacombe
Hi all,

I currently get a json string from my pgsql source with nested objects to
be converted into Flink's Row.
Nested json objects should go in nested Rows.
An avro schema rules the structure my source should conform to.

According to this json :
{
  "a":"b",
  "c":"d",
  "e":{
   "f":"g"
   }
}

("b", "d", Row("g")) is expected as a result according to my avro schema.

I wrote a recursive method which iterate over json objects and put nested
Rows at right indices in their parent but here is what outputs : ("b", "d",
"g")
Child Row is appended to the parent. I don't understand why.
Obviously, process is crashing arguing the top level Row arity doesn't
match serializers.

Is there some native methods in Flink to achieve that?
I don't feel so comfortable to have written my own json processor for this
job.

Do you have any hint which can help please ?

All the best

François

-- 

       
   



 Pensez à la 
planète, imprimer ce papier que si nécessaire 


Re: No resource available error while testing HA

2019-02-06 Thread Gary Yao
Hi Averell,

That log file does not look complete. I do not see any INFO level log
messages
such as [1].

Best,
Gary

[1]
https://github.com/apache/flink/blob/46326ab9181acec53d1e9e7ec8f4a26c672fec31/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java#L544

On Fri, Feb 1, 2019 at 12:18 AM Averell  wrote:

> Hi Gary,
>
> I faced a similar problem yesterday, but don't know what was the cause yet.
> The situation that I observed is as follow:
>  - At about 2:57, one of my EMR execution node (IP ...99) got disconnected
> from YARN resource manager (on RM I could not see that node anymore),
> despite that the node was still running. <<< This is another issue, but I
> believe it is with YARN.
>  - About 8 hours after that (between 10:00 - 11:00), I turned the
> problematic EMR core node off. AWS spun up another node and added it to the
> cluster to replace that. YARN RM soon recognized the new node and added it
> to its list of available nodes.
> However, the JM seemed to not (able to) do anything after that. It kept
> trying to start the job, failed after the timeout and that "no resource
> available" exception again and again. No jobmanager logs recorded since
> 2:57:15 though.
>
> I am attaching the logs collected via "yarn logs --applicationId 
> here. But it seems I still missed something.
>
> I am using Flink 1.7.1, with yarn-site configuration
> yarn.resourcemanager.am.max-attempts=5. Flink configurations are all of the
> default values.
>
> Thanks and best regards,
> Averell flink.log
> <
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/t1586/flink.log>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


graceful shutdown of taskmanager

2019-02-06 Thread Bernd.Winterstein
Hi
Is there a possibility to gracefully remove a taskmanager from a running 
cluster?
My idea would be to trigger affected jobs to restart via a savepoint on the 
remaining taskmanagers. When the taskmanager is idle it can be stopped without 
jobs falling back to an older checkpoint.

Regards

Bernd



  


Landesbank Hessen-Thueringen Girozentrale
Anstalt des oeffentlichen Rechts
Sitz: Frankfurt am Main / Erfurt
Amtsgericht Frankfurt am Main, HRA 29821 / Amtsgericht Jena, HRA 102181

Bitte nutzen Sie die E-Mail-Verbindung mit uns ausschliesslich zum 
Informationsaustausch. Wir koennen auf diesem Wege keine rechtsgeschaeftlichen 
Erklaerungen (Auftraege etc.) entgegennehmen.

Der Inhalt dieser Nachricht ist vertraulich und nur fuer den angegebenen 
Empfaenger bestimmt. Jede Form der Kenntnisnahme oder Weitergabe durch Dritte 
ist unzulaessig. Sollte diese Nachricht nicht fur Sie bestimmt sein, so bitten 
wir Sie, sich mit uns per E-Mail oder telefonisch in Verbindung zu setzen.

Please use your E-mail connection with us exclusively for the exchange of 
information. We do not accept legally binding declarations (orders, etc.) by 
this means of communication.

The contents of this message is confidential and intended only for the 
recipient indicated. Taking notice of this message or disclosure by third 
parties is not
permitted. In the event that this message is not intended for you, please 
contact us via E-mail or phone.


Re: Exactly Once Guarantees with StreamingFileSink to S3

2019-02-06 Thread Kostas Kloudas
Hi Kaustubh,

Your general understanding is correct.

In this case though, the sink will call the
S3Committer#commitAfterRecovery() method.
This method, after failing to commit the MPU, it will check if the file is
there and if the length
is correct, and if everything is ok (which is the case in your example),
then it will
continue to normal execution.

I hope this helps.

Kostas

On Wed, Feb 6, 2019 at 7:47 AM Kaustubh Rudrawar  wrote:

> Hi,
>
> I'm trying to understand the exactly once semantics of the
> StreamingFileSink with S3 in Flink 1.7.1 and am a bit confused on how it
> guarantees exactly once under a very specific failure scenario.
>
> For simplicity, lets say we will roll the current part file on checkpoint
> (and only on checkpoint), the process is as follows:
> 1. Framework tells the sink to prepare for a checkpoint. This ultimately
> results in 'onReceptionOfCheckpoint' being called on Bucket.java.
> 2. This takes the current file, and based on our roll policy of rolling on
> checkpoint, it closes and uploads it to S3 as part of a MPU and the
> reference to this upload is stored as part of 'pendingPartsPerCheckpoint'.
> 3. Once the checkpoint successfully completes, the bucket is notified via
> 'onSuccessfulCompletionOfCheckpoint'. At this point, the bucket goes
> through all pendingPartsPerCheckpoint and for each of them: recovers the in
> progress part (which doesn't exist in this scenario) and then commits the
> upload.
> 4. The AmazonS3Client is ultimately called to perform the upload and it
> retries the attempt up to N times. If it exhausts retries, it will throw an
> Exception.
> 5. Upon successful commit of the MPU, Bucket clears out its references to
> these uploads from its state.
>
> Given this flow, I'm having trouble understanding how the following
> scenario works:
>
>- Step 4: The commit on the MPU succeeds,
>- Step 5: Before this step completes, the task crashes. So at this
>point, S3 has successfully completed the MPU but to the client (the
>Flink job), it has not completed.
>- Flink will then recover from the checkpoint we just took and steps 3
>and 4 will be repeated. My understanding is that, since the MPU succeeded
>previously, any attempts at re-committing that upload will result in a 404
>('NoSuchUpload'). So Step 4 should throw an exception. Which would then get
>retried by the framework and this process repeats itself.
>
> So how is this case handled?
>
> Really appreciate the help!
> -Kaustubh
>
>
>


Re: late element and expired state

2019-02-06 Thread Yun Tang
Hi Ajay,

>From your description, I think watermarks[1], which indicates all earlier 
>events have been arrived, might meet your requests in a way. But this means 
>you should use windows and have event-time in your stream job.

If you don't want to introduce the concept of window, I think you can use 
'KeyedStateBackend#applyToAllKeys' to manually clear the target state when you 
see the "last" element, and record the cleared state name into a pre-definied 
operator state, so that arrived late elements could be skipped. Just be careful 
to not let the list in operator state not so large, e.g. only keep a fixed size 
of expired states.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/event_timestamps_watermarks.html

Best
Yun Tang

From: Aggarwal, Ajay 
Sent: Tuesday, February 5, 2019 22:54
To: user@flink.apache.org
Subject: late element and expired state


Hello,



I have some questions regarding best practices to deal with ever expanding 
state with KeyBy(). In my input stream I will continue to see new keys. And I 
am using Keyed state. How do I keep the total state in limit? After reading the 
flink documentation and some blogs I am planning to use following :



  *   When I know I have seen the “last” element associated with a key, I can 
manually clear the state
  *   I can also use the TTL on state and expire it and garbage collect it 
(with next full snapshot). This is useful when I never see the “last” element.



Is that the right strategy?



Also if an element arrives late (after the state has been cleared), how do I 
detect that the state has been cleared/expired so I can skip these late 
elements ? Is there an API that will give you the hint about cleared/expired 
state?



Thanks.



Ajay