If we hit the retry limit, abort the job. In our case we will restart from
the last SP ( we as any production pile do it is n time s a day )  and that
I would think should be OK for most folks ?

On Tue, Jan 23, 2018 at 11:38 AM, Vishal Santoshi <vishal.santo...@gmail.com
> wrote:

> Thank you for considering this. If I understand you correctly.
>
> * CHK pointer on ZK for a CHK state on hdfs was done successfully.
> * Some issue restarted the pipeline.
> * The NN was down unfortunately and flink could not retrieve the  CHK
> state from the CHK pointer on ZK.
>
> Before
>
> * The CHK pointer was being removed and the job started from a brand new
> slate.
>
> After ( this fix on 1.4 +)
>
> * do not delete the CHK pointer ( It has to be subsumed to be deleted ).
> * Flink keeps using this CHK pointer ( if retry is > 0 and till we hit any
> retry limit ) to restore state
> * NN comes back
> * Flink restores state on the next retry.
>
> I would hope that is the sequence to follow.
>
> Regards.
>
>
>
>
>
>
>
>
> On Tue, Jan 23, 2018 at 7:25 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi Vishal,
>>
>> I think you might be right. We fixed the problem that checkpoints where
>> dropped via https://issues.apache.org/jira/browse/FLINK-7783. However,
>> we still have the problem that if the DFS is not up at all then it will
>> look as if the job is starting from scratch. However, the alternative is
>> failing the job, in which case you will also never be able to restore from
>> a checkpoint. What do you think?
>>
>> Best,
>> Aljoscha
>>
>>
>> On 23. Jan 2018, at 10:15, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>> Sorry for the late reply.
>>
>> I created FLINK-8487 [1] to track this problem
>>
>> @Vishal, can you have a look and check if if forgot some details? I
>> logged the issue for Flink 1.3.2, is that correct?
>> Please add more information if you think it is relevant.
>>
>> Thanks,
>> Fabian
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-8487
>>
>> 2018-01-18 22:14 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>>
>>> Or this one
>>>
>>> https://issues.apache.org/jira/browse/FLINK-4815
>>>
>>> On Thu, Jan 18, 2018 at 4:13 PM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>>> ping.
>>>>
>>>>     This happened again on production and it seems reasonable to abort
>>>> when a checkpoint is not found rather than behave as if it is a brand new
>>>> pipeline.
>>>>
>>>> On Tue, Jan 16, 2018 at 9:33 AM, Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>>
>>>>> Folks sorry for being late on this. Can some body with the knowledge
>>>>> of this code base create a jira issue for the above ? We have seen this
>>>>> more than once on production.
>>>>>
>>>>> On Mon, Oct 9, 2017 at 10:21 AM, Aljoscha Krettek <aljos...@apache.org
>>>>> > wrote:
>>>>>
>>>>>> Hi Vishal,
>>>>>>
>>>>>> Some relevant Jira issues for you are:
>>>>>>
>>>>>>  - https://issues.apache.org/jira/browse/FLINK-4808: Allow skipping
>>>>>> failed checkpoints
>>>>>>  - https://issues.apache.org/jira/browse/FLINK-4815: Automatic
>>>>>> fallback to earlier checkpoint when checkpoint restore fails
>>>>>>  - https://issues.apache.org/jira/browse/FLINK-7783: Don't always
>>>>>> remove checkpoints in ZooKeeperCompletedCheckpointStore#recover()
>>>>>>
>>>>>> Best,
>>>>>> Aljoscha
>>>>>>
>>>>>>
>>>>>> On 9. Oct 2017, at 09:06, Fabian Hueske <fhue...@gmail.com> wrote:
>>>>>>
>>>>>> Hi Vishal,
>>>>>>
>>>>>> it would be great if you could create a JIRA ticket with Blocker
>>>>>> priority.
>>>>>> Please add all relevant information of your detailed analysis, add a
>>>>>> link to this email thread (see [1] for the web archive of the mailing
>>>>>> list), and post the id of the JIRA issue here.
>>>>>>
>>>>>> Thanks for looking into this!
>>>>>>
>>>>>> Best regards,
>>>>>> Fabian
>>>>>>
>>>>>> [1] https://lists.apache.org/list.html?user@flink.apache.org
>>>>>>
>>>>>> 2017-10-06 15:59 GMT+02:00 Vishal Santoshi <vishal.santo...@gmail.com
>>>>>> >:
>>>>>>
>>>>>>> Thank you for confirming.
>>>>>>>
>>>>>>>
>>>>>>>  I think this is a critical bug. In essence any checkpoint store (
>>>>>>> hdfs/S3/File)  will loose state if it is unavailable at resume. This
>>>>>>> becomes all the more painful with your confirming that  "failed
>>>>>>> checkpoints killing the job"  b'coz essentially it mean that if remote
>>>>>>> store in unavailable  during checkpoint than you have lost state ( till 
>>>>>>> of
>>>>>>> course you have a retry of none or an unbounded retry delay, a delay 
>>>>>>> that
>>>>>>> you *hope* the store revives in ) .. Remember  the first retry
>>>>>>> failure  will cause new state according the code as written iff the 
>>>>>>> remote
>>>>>>> store is down. We would rather have a configurable property that
>>>>>>> establishes  our desire to abort something like a
>>>>>>> "abort_retry_on_chkretrevalfailure"
>>>>>>>
>>>>>>>
>>>>>>> In our case it is very important that we do not undercount a window,
>>>>>>> one reason we use flink and it's awesome failure guarantees, as various
>>>>>>> alarms sound ( we do anomaly detection on the time series ).
>>>>>>>
>>>>>>> Please create a jira ticket for us to follow or we could do it.
>>>>>>>
>>>>>>>
>>>>>>> PS Not aborting on checkpointing, till a configurable limit is very
>>>>>>> important too.
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Oct 6, 2017 at 2:36 AM, Aljoscha Krettek <
>>>>>>> aljos...@apache.org> wrote:
>>>>>>>
>>>>>>>> Hi Vishal,
>>>>>>>>
>>>>>>>> I think you're right! And thanks for looking into this so deeply.
>>>>>>>>
>>>>>>>> With your last mail your basically saying, that the checkpoint
>>>>>>>> could not be restored because your HDFS was temporarily down. If Flink 
>>>>>>>> had
>>>>>>>> not deleted that checkpoint it might have been possible to restore it 
>>>>>>>> at a
>>>>>>>> later point, right?
>>>>>>>>
>>>>>>>> Regarding failed checkpoints killing the job: yes, this is
>>>>>>>> currently the expected behaviour but there are plans to change this.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Aljoscha
>>>>>>>>
>>>>>>>> On 5. Oct 2017, at 17:40, Vishal Santoshi <
>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>> I think this is the offending piece. There is a catch all
>>>>>>>> Exception, which IMHO should understand a recoverable exception from an
>>>>>>>> unrecoverable on.
>>>>>>>>
>>>>>>>>
>>>>>>>> try {
>>>>>>>> completedCheckpoint = retrieveCompletedCheckpoint(ch
>>>>>>>> eckpointStateHandle);
>>>>>>>> if (completedCheckpoint != null) {
>>>>>>>> completedCheckpoints.add(completedCheckpoint);
>>>>>>>> }
>>>>>>>> } catch (Exception e) {
>>>>>>>> LOG.warn("Could not retrieve checkpoint. Removing it from the
>>>>>>>> completed " +
>>>>>>>> "checkpoint store.", e);
>>>>>>>> // remove the checkpoint with broken state handle
>>>>>>>> removeBrokenStateHandle(checkpointStateHandle.f1,
>>>>>>>> checkpointStateHandle.f0);
>>>>>>>> }
>>>>>>>>
>>>>>>>> On Thu, Oct 5, 2017 at 10:57 AM, Vishal Santoshi <
>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> So this is the issue and tell us that it is wrong. ZK had some
>>>>>>>>> state ( backed by hdfs ) that referred to a checkpoint ( the same 
>>>>>>>>> exact
>>>>>>>>> last successful checkpoint that was successful before NN screwed us 
>>>>>>>>> ). When
>>>>>>>>> the JM tried to recreate the state and b'coz NN was down failed to 
>>>>>>>>> retrieve
>>>>>>>>> the CHK handle from hdfs and conveniently ( and I think very wrongly )
>>>>>>>>> removed the CHK from being considered and cleaned the pointer ( though
>>>>>>>>> failed as was NN was down and is obvious from the dangling file in 
>>>>>>>>> recovery
>>>>>>>>> ) . The metadata itself was on hdfs and failure in retrieving should 
>>>>>>>>> have
>>>>>>>>> been a stop all, not going to trying doing magic exception rather than
>>>>>>>>> starting from a blank state.
>>>>>>>>>
>>>>>>>>> org.apache.flink.util.FlinkException: Could not retrieve
>>>>>>>>> checkpoint 44286 from state handle under /0000000000000044286.
>>>>>>>>> This indicates that the retrieved state handle is broken. Try 
>>>>>>>>> cleaning the
>>>>>>>>> state handle store.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Oct 5, 2017 at 10:13 AM, Vishal Santoshi <
>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Also note that  the zookeeper recovery did  ( sadly on the same
>>>>>>>>>> hdfs cluster ) also showed the same behavior. It had the pointers to 
>>>>>>>>>> the
>>>>>>>>>> chk point  ( I  think that is what it does, keeps metadata of where 
>>>>>>>>>> the
>>>>>>>>>> checkpoint etc  ) .  It too decided to keep the recovery file from 
>>>>>>>>>> the
>>>>>>>>>> failed state.
>>>>>>>>>>
>>>>>>>>>> -rw-r--r--   3 root hadoop       7041 2017-10-04 13:55
>>>>>>>>>> /flink-recovery/prod/completedCheckpoint6c9096bb9ed4
>>>>>>>>>>
>>>>>>>>>> -rw-r--r--   3 root hadoop       7044 2017-10-05 10:07
>>>>>>>>>> /flink-recovery/prod/completedCheckpoint7c5a19300092
>>>>>>>>>>
>>>>>>>>>> This is getting a little interesting. What say you :)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 5, 2017 at 9:26 AM, Vishal Santoshi <
>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Another thing I noted was this thing
>>>>>>>>>>>
>>>>>>>>>>> drwxr-xr-x   - root hadoop          0 2017-10-04 13:54
>>>>>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk
>>>>>>>>>>> -44286
>>>>>>>>>>>
>>>>>>>>>>> drwxr-xr-x   - root hadoop          0 2017-10-05 09:15
>>>>>>>>>>> /flink-checkpoints/prod/c4af8dfa864e2f9a51764de9f0725b39/chk
>>>>>>>>>>> -45428
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Generally what Flink does IMHO is that it replaces the chk point
>>>>>>>>>>> directory with a new one. I see it happening now. Every minute it 
>>>>>>>>>>> replaces
>>>>>>>>>>> the old directory.  In this job's case however, it did not delete 
>>>>>>>>>>> the
>>>>>>>>>>> 2017-10-04 13:54  and hence the chk-44286 directory.  This was the 
>>>>>>>>>>> last
>>>>>>>>>>> chk-44286 (  I think  )  successfully created before NN had issues 
>>>>>>>>>>> but as
>>>>>>>>>>> is usual did not delete this  chk-44286. It looks as if it started 
>>>>>>>>>>> with a
>>>>>>>>>>> blank slate ???????? Does this strike a chord ?????
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Oct 5, 2017 at 8:56 AM, Vishal Santoshi <
>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello Fabian,
>>>>>>>>>>>>                       First of all congratulations on this
>>>>>>>>>>>> fabulous framework. I have worked with GDF and though GDF has some 
>>>>>>>>>>>> natural
>>>>>>>>>>>> pluses Flink's state management is far more advanced. With kafka 
>>>>>>>>>>>> as a
>>>>>>>>>>>> source it negates issues GDF has ( GDF integration with pub/sub is 
>>>>>>>>>>>> organic
>>>>>>>>>>>> and that is to be expected but non FIFO pub/sub is an issue with 
>>>>>>>>>>>> windows on
>>>>>>>>>>>> event time etc )
>>>>>>>>>>>>
>>>>>>>>>>>>                    Coming back to this issue. We have that same
>>>>>>>>>>>> kafka topic feeding a streaming druid datasource and we do not see 
>>>>>>>>>>>> any
>>>>>>>>>>>> issue there, so so data loss on the source, kafka is not 
>>>>>>>>>>>> applicable. I am
>>>>>>>>>>>> totally certain that the "retention" time was not an issue. It
>>>>>>>>>>>> is 4 days of retention and we fixed this issue within 30 minutes. 
>>>>>>>>>>>> We could
>>>>>>>>>>>> replay kafka with a new consumer group.id and that worked
>>>>>>>>>>>> fine.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Note these properties and see if they strike a chord.
>>>>>>>>>>>>
>>>>>>>>>>>> * The setCommitOffsetsOnCheckpoints(boolean) for kafka
>>>>>>>>>>>> consumers is the default true. I bring this up to see whether 
>>>>>>>>>>>> flink will in
>>>>>>>>>>>> any circumstance drive consumption on the kafka perceived offset 
>>>>>>>>>>>> rather
>>>>>>>>>>>> than the one in the checkpoint.
>>>>>>>>>>>>
>>>>>>>>>>>> * The state.backend.fs.memory-threshold: 0 has not been set.
>>>>>>>>>>>> The state is big enough though therefore IMHO no way the state is 
>>>>>>>>>>>> stored
>>>>>>>>>>>> along with the meta data in JM ( or ZK ? ) . The reason I bring 
>>>>>>>>>>>> this up is
>>>>>>>>>>>> to make sure when you say that the size has to be less than 
>>>>>>>>>>>> 1024bytes , you
>>>>>>>>>>>> are talking about cumulative state of the pipeine.
>>>>>>>>>>>>
>>>>>>>>>>>> * We have a good sense of SP ( save point )  and CP (
>>>>>>>>>>>> checkpoint ) and certainly understand that they actually are not
>>>>>>>>>>>> dissimilar. However in this case there were multiple attempts to 
>>>>>>>>>>>> restart
>>>>>>>>>>>> the pipe before it finally succeeded.
>>>>>>>>>>>>
>>>>>>>>>>>> * Other hdfs related poperties.
>>>>>>>>>>>>
>>>>>>>>>>>>  state.backend.fs.checkpointdir: hdfs:///flink-checkpoints/<%=
>>>>>>>>>>>> flink_hdfs_root %>
>>>>>>>>>>>>
>>>>>>>>>>>>  state.savepoints.dir: hdfs:///flink-savepoints/<%= 
>>>>>>>>>>>> flink_hdfs_root %>
>>>>>>>>>>>>
>>>>>>>>>>>>  recovery.zookeeper.storageDir: hdfs:///flink-recovery/<%= 
>>>>>>>>>>>> flink_hdfs_root %>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Do these make sense ? Is there anything else I should look at.
>>>>>>>>>>>> Please also note that it is the second time this has happened. The 
>>>>>>>>>>>> first
>>>>>>>>>>>> time I was vacationing and was not privy to the state of the flink
>>>>>>>>>>>> pipeline, but the net effect were similar. The counts for the 
>>>>>>>>>>>> first window
>>>>>>>>>>>> after an internal restart dropped.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you for you patience and regards,
>>>>>>>>>>>>
>>>>>>>>>>>> Vishal
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Oct 5, 2017 at 5:01 AM, Fabian Hueske <
>>>>>>>>>>>> fhue...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Vishal,
>>>>>>>>>>>>>
>>>>>>>>>>>>> window operators are always stateful because the operator
>>>>>>>>>>>>> needs to remember previously received events (WindowFunction) or
>>>>>>>>>>>>> intermediate results (ReduceFunction).
>>>>>>>>>>>>> Given the program you described, a checkpoint should include
>>>>>>>>>>>>> the Kafka consumer offset and the state of the window operator. 
>>>>>>>>>>>>> If the
>>>>>>>>>>>>> program eventually successfully (i.e., without an error) 
>>>>>>>>>>>>> recovered from the
>>>>>>>>>>>>> last checkpoint, all its state should have been restored. Since 
>>>>>>>>>>>>> the last
>>>>>>>>>>>>> checkpoint was before HDFS went into safe mode, the program would 
>>>>>>>>>>>>> have been
>>>>>>>>>>>>> reset to that point. If the Kafka retention time is less than the 
>>>>>>>>>>>>> time it
>>>>>>>>>>>>> took to fix HDFS you would have lost data because it would have 
>>>>>>>>>>>>> been
>>>>>>>>>>>>> removed from Kafka. If that's not the case, we need to 
>>>>>>>>>>>>> investigate this
>>>>>>>>>>>>> further because a checkpoint recovery must not result in state 
>>>>>>>>>>>>> loss.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Restoring from a savepoint is not so much different from
>>>>>>>>>>>>> automatic checkpoint recovery. Given that you have a completed 
>>>>>>>>>>>>> savepoint,
>>>>>>>>>>>>> you can restart the job from that point. The main difference is 
>>>>>>>>>>>>> that
>>>>>>>>>>>>> checkpoints are only used for internal recovery and usually 
>>>>>>>>>>>>> discarded once
>>>>>>>>>>>>> the job is terminated while savepoints are retained.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regarding your question if a failed checkpoint should cause
>>>>>>>>>>>>> the job to fail and recover I'm not sure what the current status 
>>>>>>>>>>>>> is.
>>>>>>>>>>>>> Stefan (in CC) should know what happens if a checkpoint fails.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2017-10-05 2:20 GMT+02:00 Vishal Santoshi <
>>>>>>>>>>>>> vishal.santo...@gmail.com>:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> To add to it, my pipeline is a simple
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> keyBy(0)
>>>>>>>>>>>>>>         .timeWindow(Time.of(window_size, TimeUnit.MINUTES))
>>>>>>>>>>>>>>         .allowedLateness(Time.of(late_by, TimeUnit.SECONDS))
>>>>>>>>>>>>>>         .reduce(new ReduceFunction(), new WindowFunction())
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Oct 4, 2017 at 8:19 PM, Vishal Santoshi <
>>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hello folks,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> As far as I know checkpoint failure should be ignored and
>>>>>>>>>>>>>>> retried with potentially larger state. I had this situation
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> * hdfs went into a safe mode b'coz of Name Node issues
>>>>>>>>>>>>>>> * exception was thrown
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     org.apache.hadoop.ipc.RemoteException(org.apache.
>>>>>>>>>>>>>>> hadoop.ipc.StandbyException): Operation category WRITE is
>>>>>>>>>>>>>>> not supported in state standby. Visit
>>>>>>>>>>>>>>> https://s.apache.org/sbnn-error
>>>>>>>>>>>>>>>     ..................
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     at org.apache.flink.runtime.fs.hd
>>>>>>>>>>>>>>> fs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:453)
>>>>>>>>>>>>>>>         at org.apache.flink.core.fs.Safet
>>>>>>>>>>>>>>> yNetWrapperFileSystem.mkdirs(SafetyNetWrapperFileSystem.java
>>>>>>>>>>>>>>> :111)
>>>>>>>>>>>>>>>         at org.apache.flink.runtime.state.filesystem.
>>>>>>>>>>>>>>> FsCheckpointStreamFactory.createBasePath(FsCheck
>>>>>>>>>>>>>>> pointStreamFactory.java:132)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> * The pipeline came back after a few restarts and checkpoint
>>>>>>>>>>>>>>> failures, after the hdfs issues were resolved.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I would not have worried about the restart, but it was
>>>>>>>>>>>>>>> evident that I lost my operator state. Either it was my kafka 
>>>>>>>>>>>>>>> consumer that
>>>>>>>>>>>>>>> kept on advancing it's offset between a start and the next 
>>>>>>>>>>>>>>> checkpoint
>>>>>>>>>>>>>>> failure ( a minute's worth ) or the the operator that had 
>>>>>>>>>>>>>>> partial
>>>>>>>>>>>>>>> aggregates was lost. I have a 15 minute window of counts on a 
>>>>>>>>>>>>>>> keyed operator
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I am using ROCKS DB and of course have checkpointing turned
>>>>>>>>>>>>>>> on.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The questions thus are
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> * Should a pipeline be restarted if checkpoint fails ?
>>>>>>>>>>>>>>> * Why on restart did the operator state did not recreate ?
>>>>>>>>>>>>>>> * Is the nature of the exception thrown have to do with any
>>>>>>>>>>>>>>> of this b'coz suspend and resume from a save point work as 
>>>>>>>>>>>>>>> expected ?
>>>>>>>>>>>>>>> * And though I am pretty sure, are operators like the Window
>>>>>>>>>>>>>>> operator stateful by drfault and thus if I have 
>>>>>>>>>>>>>>> timeWindow(Time.of(window_
>>>>>>>>>>>>>>> size, TimeUnit.MINUTES)).reduce(new ReduceFunction(), new
>>>>>>>>>>>>>>> WindowFunction()), the state is managed by flink ?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>

Reply via email to