Re: [DISCUSS] Make Managed Memory always off-heap (Adjustment to FLIP-49)

2019-12-02 Thread Xintong Song
Sorry, I just realized that I've send my feedbacks to Jingsong's email
address, instead of the dev / user mailing list.

Please find my comments below.


Thank you~

Xintong Song

On Wed, Nov 27, 2019 at 4:32 PM Xintong Song  wrote:

> As a participant of the discussion yesterday, I'm +1 for the proposal of
> removing on-heap managed memory.
>
> And there's one thing I want to add. In order to "reserving" memory (where
> memory consumers do not allocate MemorySegments from MemoryManager but
> allocate the reserved memory themselves), we no longer support
> pre-allocation of memory segments in FLIP-49. That means even if we do not
> remove on-heap managed memory, the MemorySegment will not be allocated
> unless requested by the consumer, and will be deallocated immediately when
> released by the consumer. Thus, it is likely that the memory segments will
> not always stays in the JVM old generation, and will be affected by GC /
> swapping just like other java objects.
>
> @Jingsong, I'm not sure whether this will be related to the performance
> issue that you mentioned.
>
> Thank you~
>
> Xintong Song
>
>
>
> On Wed, Nov 27, 2019 at 12:10 PM Jingsong Li 
> wrote:
>
>> Hi Stephan,
>>
>> +1 to default have off-heap managed memory.
>>
>> From the perspective of batch, In our long-term performance test and
>> online practice:
>> - There is no significant difference in performance between heap and
>> off-heap memory. If it is a heap object, the JVM has many opportunities to
>> optimize in JIT, so generally speaking, the heap object will be faster. But
>> at present, the manage memory we use in Flink is used as binary. In this
>> case, we use unsafe api to operate, so there is no obvious performance gap.
>> - On the contrary, too much memory in the heap will affect the
>> performance and latency of GC.
>>
>> But I'm not sure if we should only have off heap managed memory.
>> According to previous experience, array and object operations in the JVM
>> will be more beneficial. As mentioned earlier, the JVM/JIT will do a lot of
>> optimization.
>> - For vectorization, the way of array is obviously more conducive to
>> calculation. JVM can have many optimizations in array loop.
>> - We can consider using some deep code generation to generate some
>> dynamic Java objects to further speed up the operators. The snappydata[1]
>> has done some work in this area.
>>
>> So I am +0 to only have off-heap managed memory. Because we don't rely on
>> heap memory right now, only a few ideas for the future.
>>
>> [1] https://github.com/SnappyDataInc/snappydata
>>
>> Best,
>> Jingsong Lee
>>
>> On Wed, Nov 27, 2019 at 10:14 AM Stephan Ewen  wrote:
>>
>>> Hi all!
>>>
>>> Yesterday, some of the people involved in FLIP-49 had a long discussion
>>> about managed memory in Flink.
>>> Particularly, the fact that we have managed memory either on heap or off
>>> heap and that FLIP-49 introduced having both of these types of memory at
>>> the same time.
>>>
>>> ==> What we want to suggest is a simplification to only have off-heap
>>> managed memory.
>>>
>>> The rationale is the following:
>>>   - Integrating state backends with managed memory means we need to
>>> support "reserving" memory on top of creating MemorySegments.
>>> Reserving memory isn't really possible on the Java Heap, but works
>>> well off-heap
>>>
>>>   - All components that will use managed memory will work with off-heap
>>> managed memory: MemorySegment-based structures, RocksDB, possibly external
>>> processes in the future.
>>>
>>>   - A setup where state backends integrate with managed memory, but
>>> managed memory is by default all on-heap breaks the RocksDB backend out of
>>> the box experience.
>>>
>>>   - The only state backend to not use managed memory is the
>>> HeapKeyedStateBackend (used in MemoryStateBackend and FileStateBackend). It
>>> means that the HeapKeyedStateBackend always, also when all managed memory
>>> is off-heap.
>>>
>>>   - The larger use of the HeapKeyedStateBackend needs a larger JVM heap.
>>> The current FLIP-49 way to get this is to "configure managed memory to
>>> on-heap, but the managed memory will not be used, it just helps to
>>> implicitly grow the heap through the way the heap size is computed. That is
>>> a pretty confusing story. Especially when we start thinking about scenarios
>>> where Flink runs as a library in pre-existing JVM, about the mini-cluster,
>>> etc. It is simpler (and more accurate) to just say that the
>>> HeapKeyedStateBackend does not participate in managed memory, and extensive
>>> use of it requires to user to reserve heap memory (in FLIP-49 you have a
>>> new TaskHeapMemory option to request that a larger heap should be created).
>>>
>>> ==> This seems to support all scenarios in a nice way out of the box.
>>>
>>> ==> This seems easier to understand for users.
>>>
>>> ==> This simplifies the implementation of resource profiles,
>>> configuration, and computation of memory pools.
>>>
>>>
>>> Does anybody hav

Emit intermediate accumulator state of a session window

2019-12-02 Thread chandu soa
*Emit intermediate accumulator(AggregateFunction ACC value) state of a
session window when new event arrives*



AggregateFunction#getResults() is called only when window completes. My
need is emit intermediate accumulator values(result of
AggregateFunction#add()) as well and write them to Sink. Both
AggregateFunction#getResult() and ProcessWindowFunction() provides
aggregated result, only when the window is closed.

*Any thoughts please, how to emit or stream intermediate accumulator state
as soon as new event arrive when window is open? Need to implement custom
trigger or Assigner?*



To give you some background, when user watches a video we get events - when
clicked, thereafter every ~ 15minutes, and finally when user close the
video.

I need to aggregate them as soon as they arrive and post it to destination.
For example, if user watching a two-hour movie I get events for 15 min
interval(0,15,30,...,120), whenever I get a event need to aggregate watched
percentage so far and write it to sink(0%, 12.5%, 25%,...,100%). The below
implementation emitting(getResult()) a single event 20 minutes after
watching a video.





.window(*EventTimeSessionWindows.withGap(Time.minutes(20))*)


.aggregate(new EventAggregator())

.filter(new
FinalFilter())


.addSink(...)


Appreciate your help.


Thanks,

chandu


Re: Flink 1.9.1 KafkaConnector missing data (1M+ records)

2019-12-02 Thread Harrison Xu
Thank you for your reply,

Some clarification:

We have configured the BucketAssigner to use the *Kafka record timestamp*.
Exact bucketing behavior as follows:
private static final DateTimeFormatter formatter = DateTimeFormatter
.ofPattern("-MM-dd'T'HH");

@Override
public String getBucketId(KafkaRecord record, BucketAssigner.Context context)
{
return String.format(
"%s/dt=%s/partition_%s",
record.getTopic(),
Instant.ofEpochMilli(record.getTimestamp()).atZone(ZoneOffset.UTC).format
(formatter),
record.getPartition());
}

For each record, we write only its offset to the S3 object as a sanity
check. It is easy to detect missing or duplicate offsets. To answer your
questions:


*Could this explain the fact that dt 2019-11-24T01 and 2019-11-24T02are
entirely skipped?*
No, because even if the producer were idle during these datetimes, we would
expect no missing offsets. We observed both *millions of missing records*,
in addition to missing partitions (2019-11-24T01 and 2019-11-24T02).
Further, the producer was very active during this time.
I want to emphasize that we noticed that the consumer for this exact
TopicPartition was falling behind (>1 hour lag); this degree of lag was
only observed for this partition. (The consumer eventually caught up). It's
normal for the consumer to fall behind the producer for short bursts, but
we definitely do not expect missing records as a result. There were
millions of records whose timestamps fall into (dt 2019-11-24T01 and
2019-11-24T02) - they were entirely skipped by the writer.


*what does TT stand for?*
It's simply convention for datetime serialization as string.



*Can it be that there are a lot of events for partition 4 that fill up2
part files for that duration?*
We are using the BulkWriter. I am under the impression that this writer
should only produce one file per checkpoint interval, which we have
configured to be 5 minutes. You see that the preceding commits follow this
pattern of one commit per checkpoint interval, which is what we expect.
It's very strange that two files for the same TopicPartition (same
TaskManager) are committed.


I am eager to hear your reply and understand what we're seeing.

Thanks,
Harrison

On Thu, Nov 28, 2019 at 6:43 AM Kostas Kloudas  wrote:

> Hi Harrison,
>
> One thing to keep in mind is that Flink will only write files if there
> is data to write. If, for example, your partition is not active for a
> period of time, then no files will be written.
> Could this explain the fact that dt 2019-11-24T01 and 2019-11-24T02
> are entirely skipped?
>
> In addition, for the "duplicates", it would help if you could share a
> bit more information about your BucketAssigner.
> How are these names assigned to the files and what does TT stand for?
> Can it be that there are a lot of events for partition 4 that fill up
> 2 part files for that duration? I am
> asking because the counter of the 2 part files differ.
>
> Cheers,
> Kostas
>
> On Tue, Nov 26, 2019 at 1:09 AM Harrison Xu  wrote:
> >
> > Hello,
> >
> > We're seeing some strange behavior with flink's KafkaConnector010 (Kafka
> 0.10.1.1) arbitrarily skipping data.
> >
> > Context
> > KafkaConnector010 is used as source, and
> StreamingFileSink/BulkPartWriter (S3) as sink with no intermediate
> operators. Recently, we noticed that millions of Kafka records were missing
> for one topic partition (this job is running for 100+ topic partitions, and
> such behavior was only observed for one). This job is run on YARN, and
> hosts were healthy with no hardware faults observed. No exceptions in
> jobmanager or taskmanager logs at this time.
> >
> > How was this detected?
> > As a sanity check, we dual-write Kafka metadata (offsets) to a separate
> location in S3, and have monitoring to ensure that written offsets are
> contiguous with no duplicates.
> > Each Kafka record is bucketed into hourly datetime partitions (UTC) in
> S3.
> >
> > (Condensed) Taskmanager logs
> > 2019-11-24 02:36:50,140 INFO
> org.apache.flink.fs.s3.common.writer.S3Committer  - Committing
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5252
> with MPU ID 3XG...
> > 2019-11-24 02:41:27,966 INFO
> org.apache.flink.fs.s3.common.writer.S3Committer  - Committing
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5253
> with MPU ID 9MW...
> > 2019-11-24 02:46:29,153 INFO
> org.apache.flink.fs.s3.common.writer.S3Committer  - Committing
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5254
> with MPU ID 7AP...
> > 2019-11-24 02:51:32,602 INFO
> org.apache.flink.fs.s3.common.writer.S3Committer  - Committing
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5255
> with MPU ID xQU...
> > 2019-11-24 02:56:35,183 INFO
> org.apache.flink.fs.s3.common.writer.S3Committer  - Committing
> kafka_V2/meta/digest_features/dt=2019-11-24T00/partition_4/part-12-5256
> with MPU ID pDL...
> > 2019-11-24 03:01:26,

Side output question

2019-12-02 Thread M Singh
Hi:
I am replacing SplitOperator in my flink application with a simple processor 
with side outputs.

My questions is that does the main stream from which we get the side outputs 
need to have any events (ie, produced using by the using collector.collect) ?  
Or can we have all the output as side outputs ? Also are there any pros and 
cons of at least one main collected output vs all side outputs ?
Thanks
Mans

Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?

2019-12-02 Thread Yun Tang
Hi Salva

As I pointed out, it's not clear for your program logic if you call 
'state.clear()' within 'snapshotState' as you do not know what exact current 
key is. Hence, I think your idea like that does not make any sense.

From my point of view, 'clear' works sometimes in your code is not a bug at 
current Flink framework. Currently, we would set currentKey when processing a 
record. However, Flink does not need to reset current key to null since there 
is no such life cycle for setting current key now. There seems no any benefit 
if introducing this, and might cause performance regression as we need more 
steps here.

Best
Yun Tang


On 12/2/19, 9:29 PM, "Salva Alcántara"  wrote:

Hi Yun,

Thanks for your reply. You mention that

" ‘snapshotState’ and ‘initializeState’ interfaces are used mainly to
snapshot and initialize for operator state"

but..."mainly" is not "exclusively" right? So, I guess my question tries to
figure out whether doing something like this is valid/makes sense?

```
  override def snapshotState(context: FunctionSnapshotContext): Unit = {
if (models.nonEmpty) {
  modelsBytes.clear()
  for ((k, model) <- models) {
modelsBytes.put(k, model.toBytes(v))
  }
}
  }
```

Indeed, the above code seems to work well ... so it seems like a bug that
`clear` works sometimes but sometimes not as I noted in my reply to Congxian
and others have noted in this extended question posted in stackoverflow:


https://stackoverflow.com/questions/59123188/state-handling-on-keyedcoprocessfunction-serving-ml-models



--
Sent from: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/




Re: Firing timers on ProcessWindowFunction

2019-12-02 Thread Avi Levi
Thanks Alexander,
Will do.

Cheers

On Mon, Dec 2, 2019 at 3:23 PM Alexander Fedulov 
wrote:

> *This Message originated outside your organization.*
> --
> Hi Avi,
>
> In this situation I would propose to step back and use a lower level API
> -  ProcessFunction. You can put your window elements into the Flink-managed
> List state and handle clean-up/triggering and periodic state mutations
> exactly as needed by implementing some additional timers logic.
>
> Best regards,
>
> --
>
> Alexander Fedulov | Solutions Architect
>
> +49 1514 6265796
>
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward
> 
> - The Apache Flink Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
>
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Tony) Cheng
>
>
>
> On Mon, Dec 2, 2019 at 1:16 PM Avi Levi  wrote:
>
>> I think the only way to do this is to add keyed operator down the stream
>> that will hold the global state. not ideal but I don't see any other option
>>
>> On Mon, Dec 2, 2019 at 1:43 PM Avi Levi  wrote:
>>
>>> Hi Vino,
>>> I have a global state that I need to mutate every X hours (e.g clean
>>> that state or update its value) . I thought that there might be an option
>>> to set a timer user the timerService with it's own time interval detached
>>> from the window interval interval .
>>>
>>> On Mon, Dec 2, 2019 at 10:59 AM vino yang  wrote:
>>>
 *This Message originated outside your organization.*
 --
 Hi Avi,

 Firstly, let's clarify that the "timer" you said is the timer of the
 window? Or a timer you want to register to trigger some action?

 Best,
 Vino


 Avi Levi  于2019年12月2日周一 下午4:11写道:

> Hi,
> Is there a way to fire timer in a ProcessWindowFunction ? I would like
> to mutate the global state on a timely basis.
>
>


Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?

2019-12-02 Thread Salva Alcántara
Hi Yun,

Thanks for your reply. You mention that

" ‘snapshotState’ and ‘initializeState’ interfaces are used mainly to
snapshot and initialize for operator state"

but..."mainly" is not "exclusively" right? So, I guess my question tries to
figure out whether doing something like this is valid/makes sense?

```
  override def snapshotState(context: FunctionSnapshotContext): Unit = {
if (models.nonEmpty) {
  modelsBytes.clear()
  for ((k, model) <- models) {
modelsBytes.put(k, model.toBytes(v))
  }
}
  }
```

Indeed, the above code seems to work well ... so it seems like a bug that
`clear` works sometimes but sometimes not as I noted in my reply to Congxian
and others have noted in this extended question posted in stackoverflow:

https://stackoverflow.com/questions/59123188/state-handling-on-keyedcoprocessfunction-serving-ml-models



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Using MapState clear, put methods in snapshotState within KeyedCoProcessFunction, valid or not?

2019-12-02 Thread Salva Alcántara
Thanks Congxian. From what I've read, it seems that using the keyed state in
`snapshotState` is incorrect...what confuses me is that if I do something
like this

```
  override def snapshotState(context: FunctionSnapshotContext): Unit = {
if (models.nonEmpty) {
  modelsBytes.clear() // This raises an exception when there is no
active key set
  for ((k, model) <- models) {
modelsBytes.put(k, model.toBytes(v))
  }
}
  }
```

Then, when there is data (`models` is populated within `processElement1`),
the `clear` and subsequent calls to `put` work just fine. This seems like a
bug to me, as others have pointed out in this somehow extended question
posted in stackoverflow:

https://stackoverflow.com/questions/59123188/state-handling-on-keyedcoprocessfunction-serving-ml-models

Do you think the fact that `clear` works within `snapshotState` under
certain circumstances is indeed a bug?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Firing timers on ProcessWindowFunction

2019-12-02 Thread Avi Levi
I think the only way to do this is to add keyed operator down the stream
that will hold the global state. not ideal but I don't see any other option

On Mon, Dec 2, 2019 at 1:43 PM Avi Levi  wrote:

> Hi Vino,
> I have a global state that I need to mutate every X hours (e.g clean that
> state or update its value) . I thought that there might be an option to set
> a timer user the timerService with it's own time interval detached from the
> window interval interval .
>
> On Mon, Dec 2, 2019 at 10:59 AM vino yang  wrote:
>
>> *This Message originated outside your organization.*
>> --
>> Hi Avi,
>>
>> Firstly, let's clarify that the "timer" you said is the timer of the
>> window? Or a timer you want to register to trigger some action?
>>
>> Best,
>> Vino
>>
>>
>> Avi Levi  于2019年12月2日周一 下午4:11写道:
>>
>>> Hi,
>>> Is there a way to fire timer in a ProcessWindowFunction ? I would like
>>> to mutate the global state on a timely basis.
>>>
>>>


Re: Firing timers on ProcessWindowFunction

2019-12-02 Thread Avi Levi
Hi Vino,
I have a global state that I need to mutate every X hours (e.g clean that
state or update its value) . I thought that there might be an option to set
a timer user the timerService with it's own time interval detached from the
window interval interval .

On Mon, Dec 2, 2019 at 10:59 AM vino yang  wrote:

> *This Message originated outside your organization.*
> --
> Hi Avi,
>
> Firstly, let's clarify that the "timer" you said is the timer of the
> window? Or a timer you want to register to trigger some action?
>
> Best,
> Vino
>
>
> Avi Levi  于2019年12月2日周一 下午4:11写道:
>
>> Hi,
>> Is there a way to fire timer in a ProcessWindowFunction ? I would like to
>> mutate the global state on a timely basis.
>>
>>


Re: Temporary failure in name resolution on JobManager

2019-12-02 Thread David Maddison
Thanks Yang.

We did try both those properties and it didn't fix it. However, we did
EVENTUALLY (after some late nights!) track the issue down, not to DNS
resolution but rather an obscure bug our our connector code :-(

Thanks for your response,

/David/

On Mon, Dec 2, 2019 at 3:16 AM Yang Wang  wrote:

> Hi David,
>
> Do you mean when the JobManager starts, the dns has some problem and the
> service could
> not be resolved? The dns restores to normal, and the JobManager jvm could
> not look up the
> dns.
> I think it may because the jvm dns cache. You could set the ttl and have a
> try.
> sun.net.inetaddr.ttl
> sun.net.inetaddr.negative.ttl
>
>
> Best,
> Yang
>
> David Maddison  于2019年11月29日周五 下午6:41写道:
>
>> I have a Flink 1.7 cluster using the "flink:1.7.2" (OpenJDK build
>> 1.8.0_222-b10) image on Kubernetes.
>>
>> As part of a MasterRestoreHook (for checkpointing) the JobManager needs
>> to communicate with an external security service.  This all works well
>> until there's a DNS lookup failure (due to network issues) at which point
>> the JobManager JVM seems unable to ever successfully look up the name
>> again, even when it's confirmed DNS service has been restored.  The weird
>> thing is that I can use kubectl to exec into the JobManager POD and
>> successfully perform a lookup even while the JobManager JVM is still
>> failing to lookup.
>>
>> Has anybody seen an issue like this before, or have any suggestions?  As
>> far as I'm aware Flink doesn't install a SecurityManager and therefore the
>> JVM should only cache invalid name requests for 10 seconds.
>>
>> Restarting the JobManager JVM does successfully recover the Job, but I'd
>> like to avoid having to do that if possible.
>>
>> Caused by: java.net.UnknownHostException: <>.com: Temporary
>> failure in name resolution
>> at java.net.Inet4AddressImpl.lookupAllHostAddr(Native Method)
>> at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:929)
>> at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1324)
>> at java.net.InetAddress.getAllByName0(InetAddress.java:1277)
>> at java.net.InetAddress.getAllByName(InetAddress.java:1193)
>> at java.net.InetAddress.getAllByName(InetAddress.java:1127)
>>
>> Thanks in advance,
>>
>> David
>>
>


Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

2019-12-02 Thread Victor Wong
Hi,

We encountered similar issues that the task manager kept being killed by
yarn.

- flink 1.9.1
- heap usage is low.

But our job is a **streaming** job, so I want to ask if this issue is only
related to **batch** job or not? Thanks!

Best,
Victor


yingjie  于2019年11月28日周四 上午11:43写道:

> Piotr is right, that depend on the data size you are reading and the memory
> pressure. Those memory occupied by mmapped region can be recycled and used
> by other processes if memory pressure is high, that is, other process or
> service on the same node won't be affected because the OS will recycle the
> mmapped pages if needed. But currently, you can't assume a bound of the
> memory can be used, because it will use more memory as long as there is
> free
> space and you have more new data to read.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Best,
Victor


RE: Access to CheckpointStatsCounts

2019-12-02 Thread min.tan
Many thanks for sending your reply.

It is not for monitoring but for configuration.

For a job starting from an empty status, we like to load the fresh 
configurations.
For a job recovering from a checkpoint, we like to rely on the checkpoint.

Regards,

Min

From: vino yang [mailto:yanghua1...@gmail.com]
Sent: Montag, 2. Dezember 2019 10:09
To: Tan, Min
Cc: user
Subject: [External] Re: Access to CheckpointStatsCounts

Hi min,

If it is only for monitoring purposes, you can just use checkpoint REST API[1] 
to do this work.

[1]: 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html#jobs-jobid-checkpoints

Best,
Vino

mailto:min@ubs.com>> 于2019年12月2日周一 下午5:01写道:
Hi,

Just wonder how to access the CheckpointStatsCoutns from the main method of a 
job?

We need to detect if a job recovers from a checkpoint or starts from an empty 
status.

Regards,

Min

E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

Re: Access to CheckpointStatsCounts

2019-12-02 Thread vino yang
Hi min,

If it is only for monitoring purposes, you can just use checkpoint REST
API[1] to do this work.

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/monitoring/rest_api.html#jobs-jobid-checkpoints

Best,
Vino

 于2019年12月2日周一 下午5:01写道:

> Hi,
>
>
>
> Just wonder how to access the CheckpointStatsCoutns from the main method
> of a job?
>
>
>
> We need to detect if a job recovers from a checkpoint or starts from an
> empty status.
>
>
>
> Regards,
>
>
>
> Min
>


Access to CheckpointStatsCounts

2019-12-02 Thread min.tan
Hi,

Just wonder how to access the CheckpointStatsCoutns from the main method of a 
job?

We need to detect if a job recovers from a checkpoint or starts from an empty 
status.

Regards,

Min

E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/privacy-statement

Re: Firing timers on ProcessWindowFunction

2019-12-02 Thread vino yang
Hi Avi,

Firstly, let's clarify that the "timer" you said is the timer of the
window? Or a timer you want to register to trigger some action?

Best,
Vino


Avi Levi  于2019年12月2日周一 下午4:11写道:

> Hi,
> Is there a way to fire timer in a ProcessWindowFunction ? I would like to
> mutate the global state on a timely basis.
>
>


Firing timers on ProcessWindowFunction

2019-12-02 Thread Avi Levi
Hi,
Is there a way to fire timer in a ProcessWindowFunction ? I would like to
mutate the global state on a timely basis.