Re: jdbc connector configuration

2021-10-12 Thread Qihua Yang
Hi,

If I configure batch mode, application will stop after the job is complete,
right? Then k8s will restart the pod and rerun the job. That is not what we
want.

Thanks,
Qihua

On Tue, Oct 12, 2021 at 7:27 PM Caizhi Weng  wrote:

> Hi!
>
> It seems that you want to run a batch job instead of a streaming job.
> Call EnvironmentSettings.newInstance().inBatchMode().build() to create your
> environment settings for a batch job.
>
> Qihua Yang  于2021年10月13日周三 上午5:50写道:
>
>> Hi,
>>
>> Sorry for asking again. I plan to use JDBC connector to scan a database.
>> How do I know if it is done? Are there any metrics I can track? We want to
>> monitor the progress, stop flink application when it is done.
>>
>> Thanks,
>> Qihua
>>
>> On Fri, Oct 8, 2021 at 10:07 AM Qihua Yang  wrote:
>>
>>> It is pretty clear. Thanks Caizhi!
>>>
>>> On Thu, Oct 7, 2021 at 7:27 PM Caizhi Weng  wrote:
>>>
 Hi!

 These configurations are not required to merely read from a database.
 They are here to accelerate the reads by allowing sources to read data in
 parallel.

 This optimization works by dividing the data into several
 (scan.partition.num) partitions and each partition will be read by a task
 slot (not a task manager, as a task manager may have multiple task slots).
 You can set scan.partition.column to specify the partition key and also set
 the lower and upper bounds for the range of data.

 Let's say your partition key is the column "k" which ranges from 0 to
 999. If you set the lower bound to 0, the upper bound to 999 and the number
 of partitions to 10, then all data satisfying 0 <= k < 100 will be divided
 into the first partition and read by the first task slot, all 100 <= k <
 200 will be divided into the second partition and read by the second task
 slot and so on. So these configurations should have nothing to do with the
 number of rows you have, but should be related to the range of your
 partition key.

 Qihua Yang  于2021年10月7日周四 上午7:43写道:

> Hi,
>
> I am trying to read data from database with JDBC driver. From [1], I
> have to config below parameters. I am not quite sure if I understand it
> correctly. lower-bound is smallest value of the first partition,
> upper-bound is largest value of the last partition. For example, if the db
> table has 1000 rows. lower-bound is 0, upper-bound is 999. Is that 
> correct?
> If  setting scan.partition.num to 10, each partition read 100 row?
> if I set scan.partition.num to 10 and I have 10 task managers. Each
> task manager will pick a partition to read?
>
>- scan.partition.column: The column name used for partitioning the
>input.
>- scan.partition.num: The number of partitions.
>- scan.partition.lower-bound: The smallest value of the first
>partition.
>- scan.partition.upper-bound: The largest value of the last
>partition.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/
>
> Thanks,
> Qihua
>



Re: 关于slot分配

2021-10-12 Thread yidan zhao
目前从概率上来看,默认cluster.evenly-spread-out-slots 设为
false的情况下,都是单TM用完才下一个TM,只有少数时候不清楚啥情况就出现这种问题了。

这种情况下,我是想彻底隔离任务,一旦某个TM的slot没用完,会导致提交其他任务可能也用到该TM,这样任务隔离性不够。
然后有时候机器出问题,或者任务出问题重启导致TM失败等会导致更多的任务重启。

Caizhi Weng  于2021年10月13日周三 上午10:20写道:

> Hi!
>
> “默认的优先单个 TM 的机制”我记得没有这样的参数。你的意思是把 cluster.evenly-spread-out-slots 设为 false
> 吗?如果是这样,那么会在所有 slot 中任意选择,而不是优先单个 TM。
>
> 想知道优先单个 TM 是出于什么样的需求呢?因为这样做可能会造成集群内部分机器很忙,但部分机器空闲的情况,忙机器上的并发会被拖慢。
>
> yidan zhao  于2021年10月12日周二 下午4:25写道:
>
> > 我以前采用分配到多个TM的机制,最近尝试了下默认的优先单个TM的机制。
> > 但是发现个问题,我当前每个TM是10个slot,我有个任务40并发,然后实际占用了5个TM,10+10+10+2+8。这个是啥情况呢?
> >
> > 我更期望要么就彻底平均(配置spread那个参数),要么就单个TM这样用。
> > 前者:期望机器之间均衡。
> > 后者:期间任务之间完全隔离,我的任务并发会设置单TMslot数量(10)的倍数。
> >
>


[External] metric collision using datadog and standalone Kubernetes HA mode

2021-10-12 Thread Clemens Valiente
Hi,

we are using datadog as our metrics reporter as documented here:
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/metric_reporters/#datadog

our jobmanager scope is
metrics.scope.jm: flink.jobmanager
metrics.scope.jm.job: flink.jobmanager
since datadog doesn't allow placeholder in metric names, we cannot include
the  or  placeholder in the scope.

This setup worked nicely on our standalone kubernetes application
deployment without using HA.
But when we set up HA, we lost checkpointing metrics in datadog, and see
this warning in the jobmanager log:

2021-10-01 04:22:09,920 WARN  org.apache.flink.metrics.MetricGroup
[] - Name collision: Group already contains a
Metric with the name 'totalNumberOfCheckpoints'. Metric will not be
reported.[flink, jobmanager]
2021-10-01 04:22:09,920 WARN  org.apache.flink.metrics.MetricGroup
[] - Name collision: Group already contains a
Metric with the name 'numberOfInProgressCheckpoints'. Metric will not
be reported.[flink, jobmanager]
2021-10-01 04:22:09,920 WARN  org.apache.flink.metrics.MetricGroup
[] - Name collision: Group already contains a
Metric with the name 'numberOfCompletedCheckpoints'. Metric will not
be reported.[flink, jobmanager]
2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup
[] - Name collision: Group already contains a
Metric with the name 'numberOfFailedCheckpoints'. Metric will not be
reported.[flink, jobmanager]
2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup
[] - Name collision: Group already contains a
Metric with the name 'lastCheckpointRestoreTimestamp'. Metric will not
be reported.[flink, jobmanager]
2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup
[] - Name collision: Group already contains a
Metric with the name 'lastCheckpointSize'. Metric will not be
reported.[flink, jobmanager]
2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup
[] - Name collision: Group already contains a
Metric with the name 'lastCheckpointDuration'. Metric will not be
reported.[flink, jobmanager]
2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup
[] - Name collision: Group already contains a
Metric with the name 'lastCheckpointProcessedData'. Metric will not be
reported.[flink, jobmanager]
2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup
[] - Name collision: Group already contains a
Metric with the name 'lastCheckpointPersistedData'. Metric will not be
reported.[flink, jobmanager]
2021-10-01 04:22:09,921 WARN  org.apache.flink.metrics.MetricGroup
[] - Name collision: Group already contains a
Metric with the name 'lastCheckpointExternalPath'. Metric will not be
reported.[flink, jobmanager]


I assume this is because we now have two jobmanager pods (one active one
standby) and they both report this metric, it fails. but we cannot use the
 scope in the group, otherwise we won't be able to build datadog
dashboards conveniently.

My question:
- did anyone else encounter this problem?
- how could we solve this to have checkpointing metrics again in HA mode
without needing the  placeholder?

Thanks a lot
Clemens

-- 


By communicating with Grab Inc and/or its subsidiaries, associate 
companies and jointly controlled entities (“Grab Group”), you are deemed to 
have consented to the processing of your personal data as set out in the 
Privacy Notice which can be viewed at https://grab.com/privacy/ 



This email contains confidential information 
and is only for the intended recipient(s). If you are not the intended 
recipient(s), please do not disseminate, distribute or copy this email 
Please notify Grab Group immediately if you have received this by mistake 
and delete this email from your system. Email transmission cannot be 
guaranteed to be secure or error-free as any information therein could be 
intercepted, corrupted, lost, destroyed, delayed or incomplete, or contain 
viruses. Grab Group do not accept liability for any errors or omissions in 
the contents of this email arises as a result of email transmission. All 
intellectual property rights in this email and attachments therein shall 
remain vested in Grab Group, unless otherwise provided by law.



Re: jdbc connector configuration

2021-10-12 Thread Caizhi Weng
Hi!

It seems that you want to run a batch job instead of a streaming job.
Call EnvironmentSettings.newInstance().inBatchMode().build() to create your
environment settings for a batch job.

Qihua Yang  于2021年10月13日周三 上午5:50写道:

> Hi,
>
> Sorry for asking again. I plan to use JDBC connector to scan a database.
> How do I know if it is done? Are there any metrics I can track? We want to
> monitor the progress, stop flink application when it is done.
>
> Thanks,
> Qihua
>
> On Fri, Oct 8, 2021 at 10:07 AM Qihua Yang  wrote:
>
>> It is pretty clear. Thanks Caizhi!
>>
>> On Thu, Oct 7, 2021 at 7:27 PM Caizhi Weng  wrote:
>>
>>> Hi!
>>>
>>> These configurations are not required to merely read from a database.
>>> They are here to accelerate the reads by allowing sources to read data in
>>> parallel.
>>>
>>> This optimization works by dividing the data into several
>>> (scan.partition.num) partitions and each partition will be read by a task
>>> slot (not a task manager, as a task manager may have multiple task slots).
>>> You can set scan.partition.column to specify the partition key and also set
>>> the lower and upper bounds for the range of data.
>>>
>>> Let's say your partition key is the column "k" which ranges from 0 to
>>> 999. If you set the lower bound to 0, the upper bound to 999 and the number
>>> of partitions to 10, then all data satisfying 0 <= k < 100 will be divided
>>> into the first partition and read by the first task slot, all 100 <= k <
>>> 200 will be divided into the second partition and read by the second task
>>> slot and so on. So these configurations should have nothing to do with the
>>> number of rows you have, but should be related to the range of your
>>> partition key.
>>>
>>> Qihua Yang  于2021年10月7日周四 上午7:43写道:
>>>
 Hi,

 I am trying to read data from database with JDBC driver. From [1], I
 have to config below parameters. I am not quite sure if I understand it
 correctly. lower-bound is smallest value of the first partition,
 upper-bound is largest value of the last partition. For example, if the db
 table has 1000 rows. lower-bound is 0, upper-bound is 999. Is that correct?
 If  setting scan.partition.num to 10, each partition read 100 row?
 if I set scan.partition.num to 10 and I have 10 task managers. Each
 task manager will pick a partition to read?

- scan.partition.column: The column name used for partitioning the
input.
- scan.partition.num: The number of partitions.
- scan.partition.lower-bound: The smallest value of the first
partition.
- scan.partition.upper-bound: The largest value of the last
partition.

 [1]
 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/

 Thanks,
 Qihua

>>>


Re: 关于slot分配

2021-10-12 Thread Caizhi Weng
Hi!

“默认的优先单个 TM 的机制”我记得没有这样的参数。你的意思是把 cluster.evenly-spread-out-slots 设为 false
吗?如果是这样,那么会在所有 slot 中任意选择,而不是优先单个 TM。

想知道优先单个 TM 是出于什么样的需求呢?因为这样做可能会造成集群内部分机器很忙,但部分机器空闲的情况,忙机器上的并发会被拖慢。

yidan zhao  于2021年10月12日周二 下午4:25写道:

> 我以前采用分配到多个TM的机制,最近尝试了下默认的优先单个TM的机制。
> 但是发现个问题,我当前每个TM是10个slot,我有个任务40并发,然后实际占用了5个TM,10+10+10+2+8。这个是啥情况呢?
>
> 我更期望要么就彻底平均(配置spread那个参数),要么就单个TM这样用。
> 前者:期望机器之间均衡。
> 后者:期间任务之间完全隔离,我的任务并发会设置单TMslot数量(10)的倍数。
>


Re: Checkpoint size increasing even i enable increasemental checkpoint

2021-10-12 Thread Vijay Bhaskar
Since state size is small, you can try FileState Backend, rather than
RocksDB. You can check once. Thumb rule is if FileStateBackend Performs
worse, RocksDB is good.

Regards
Bhasakar

On Tue, Oct 12, 2021 at 1:47 PM Yun Tang  wrote:

> Hi Lei,
>
> RocksDB state-backend's checkpoint is composited by RocksDB's own files
> (unmodified compressed SST format files) and incremental checkpoints means
> Flink does not upload files which were uploaded before. As you can see,
> incremental checkpoints highly depend on the RocksDB's own mechanism to
> remove useless files, which is triggered by internal compaction. You should
> not care too much on the checkpointed data size as your job consuming more
> and more records, moreover the increasing size is actually quite small
> (from 1.32GB to 1.34GB).
>
> Best
> Yun Tang
>
>
>
> --
> *From:* Lei Wang 
> *Sent:* Monday, October 11, 2021 16:16
> *To:* user 
> *Subject:* Checkpoint size increasing even i enable increasemental
> checkpoint
>
>
> [image: image.png]
>
> The  checkpointed data size became bigger and bigger and the node cpu is
> very high when the job is doing checkpointing.
>  But I have enabled incremental checkpointing:  env.setStateBackend(new 
> RocksDBStateBackend(checkpointDir,
> true));
>
> I am using flink-1.11.2 and aliyun oss as checkpoint storage.
>
>
> Any insight on this?
>
> Thanks,
>
> Lei
>
>
>


Re: Impossible to get pending file names/paths on checkpoint?

2021-10-12 Thread Preston Price
Thanks for your thoughts here Fabian, I've responded inline but I also want
to clarify the reason I need the file paths on commit.
The FileSink works as expected in Azure Data Lake with the ABFS connector,
but I want to perform an additional step by telling Azure Data Explorer to
ingest the committed files, and I need their paths to do so. This is why
I've implemented the hack below to Reflectively get access to the
underlying File, which I can then use to craft my ingestion command to
Azure Data Explorer.

On Tue, Oct 12, 2021 at 2:15 AM Fabian Paul 
wrote:

> Hi Preston,
>
> I just noticed I forgot to cc to the user mailing list on my first reply
> …. I have a few thoughts about the design you are describing.
>
>
> In the meantime I have a nasty hack in place that has unblocked me for now
> in getting the target file off the LocalRecoverable/HadoopFsRecoverable:
>
> InProgressFileWriter.PendingFileRecoverable recoverable =
>> committable.getPendingFile();
>
> RecoverableWriter.CommitRecoverable commitRecoverable =
>> ((OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable)
>> recoverable).getCommitRecoverable();
>
>
>> Method m = commitRecoverable.getClass().getMethod("targetFile");
>> m.setAccessible(true);
>> File targetFile = (File) m.invoke(commitRecoverable);
>
> I think a good place to start would be to introduce getTargetFile, and
> getTempFile methods on the CommitRecoverable interface, though I haven't
> fully studied the impact of this approach on other implementations of that
> interface.
>
>
> I might miss the context here or lack of knowledge how the Azure Data Lake
> works but why do you need access to the target and/or temp file locations.
> You scenario sounds very similar to any other distributed file system.
>

For my case I need to know the final path to the finished files so I can
issue an ingest command to Azure Data Explorer for each file once they're
committed. When using Azure Data Lake for storage I can instruct Azure Data
Explorer to ingest a file from a path in blob storage, but I need to know
what the path is. Alternatively we may be able to leverage something like
Event Grid which can send a signal whenever a new file lands in a
particular path in Azure Data Lake, but there are benefits to having tight
control over issuing the ingest commands.


>
>
> A note on implementing our part-file scoped Encoder: The current Encoder
> pattern in 1.14 assumes that the same encoder will work for all files, for
> all time. We had to make numerous small changes to the File Sink to break
> this pattern, and allow for an Encoder instance per part file. My current
> solution uses a custom BucketID object with both Path, and EventType
> properties. In our BucketWriter.openNew method we can use the
> BucketId.EventType to lookup the Protobuf descriptor we need, create a new
> Encoder and pass it to our RowWisePartWriter. We had to reimplement/absorb
> a significant amount of the File Sink code to accomplish this as the File
> Sink implementation assumes a String for BucketID and there are many
> roadblocks put in place to prevent extending FileSink functionality.
>
>
> This is an interesting point. I guess we did not think about such use case
> when developing the sink. Maybe we can approach the problem differently.
> I am thinking about adding a context to the `Encoder#encode` method where
> metadata (new bucket, filename, bucketname) is accessible. Does this help
> in your case?
>

Yes, this could have saved me a great deal of hassle if there were
additional context provided to the encoder about the lifecycle, and
BucketID of the underlying part file. It would still be a bit of a complex
Encoder as, for my case, each bucket needs to be encoded differently, and
state updated when files roll.


> A perfect example of these roadblocks is the FileWriterBucketFactory
> interface. It looks like we could provide our own implementation of this
> interface, but the return type of it's methods (FileWriterBucket) have
> default (package protected) visibility and so we can neither provide our
> own implementation, nor sub-class the return types to add our own logic.
> Another example is the OutputStreamBasedPartFileWriter which wraps a
> default (package protected) visibility abstract class
> (OutputStreamBasedBucketWriter). I ran into numerous issues like these.
>
>
> In general, all classes annotated with @Internal are not meant to be used
> outside of  Flink but I agree sometimes it becomes necessary. Although if
> more and more people need to reimplement big parts of the FlieSink we have
> to incorporate that feedback make it extensible.
>

In my case the solution is then to reimplement all that great functionality
and it will make upgrading to future versions of Flink harder.


>
> A note on implementing our Azure Data Explorer sink: Currently we're
> looking to add code in a custom Committer to do this. However, since I
> can't grok a way to make the file commit + ADX ingest 

Re: jdbc connector configuration

2021-10-12 Thread Qihua Yang
Hi,

Sorry for asking again. I plan to use JDBC connector to scan a database.
How do I know if it is done? Are there any metrics I can track? We want to
monitor the progress, stop flink application when it is done.

Thanks,
Qihua

On Fri, Oct 8, 2021 at 10:07 AM Qihua Yang  wrote:

> It is pretty clear. Thanks Caizhi!
>
> On Thu, Oct 7, 2021 at 7:27 PM Caizhi Weng  wrote:
>
>> Hi!
>>
>> These configurations are not required to merely read from a database.
>> They are here to accelerate the reads by allowing sources to read data in
>> parallel.
>>
>> This optimization works by dividing the data into several
>> (scan.partition.num) partitions and each partition will be read by a task
>> slot (not a task manager, as a task manager may have multiple task slots).
>> You can set scan.partition.column to specify the partition key and also set
>> the lower and upper bounds for the range of data.
>>
>> Let's say your partition key is the column "k" which ranges from 0 to
>> 999. If you set the lower bound to 0, the upper bound to 999 and the number
>> of partitions to 10, then all data satisfying 0 <= k < 100 will be divided
>> into the first partition and read by the first task slot, all 100 <= k <
>> 200 will be divided into the second partition and read by the second task
>> slot and so on. So these configurations should have nothing to do with the
>> number of rows you have, but should be related to the range of your
>> partition key.
>>
>> Qihua Yang  于2021年10月7日周四 上午7:43写道:
>>
>>> Hi,
>>>
>>> I am trying to read data from database with JDBC driver. From [1], I
>>> have to config below parameters. I am not quite sure if I understand it
>>> correctly. lower-bound is smallest value of the first partition,
>>> upper-bound is largest value of the last partition. For example, if the db
>>> table has 1000 rows. lower-bound is 0, upper-bound is 999. Is that correct?
>>> If  setting scan.partition.num to 10, each partition read 100 row?
>>> if I set scan.partition.num to 10 and I have 10 task managers. Each task
>>> manager will pick a partition to read?
>>>
>>>- scan.partition.column: The column name used for partitioning the
>>>input.
>>>- scan.partition.num: The number of partitions.
>>>- scan.partition.lower-bound: The smallest value of the first
>>>partition.
>>>- scan.partition.upper-bound: The largest value of the last
>>>partition.
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/
>>>
>>> Thanks,
>>> Qihua
>>>
>>


Re: Flink 1.11 loses track of event timestamps and watermarks after process function

2021-10-12 Thread Ahmad Alkilani
Thanks Arvid.
Getting the easy stuff out of the way, I certainly wait for longer than 10s
(typically observe what happens over a few minutes) so the bounded
watermark issue isn't in play here.

The Async IO as it stands today has timeouts so it doesn't run
indefinitely. WIth that said, I replaced the Aync IO with a simple process
function and print statements in the body of the process function. The
process function simply emits what it received. I also removed the custom
sink (that has an external dependency) and replaced it with a simple lambda
that occasionally prints just to show progress.

So now:
Kafka -> Flink Kafka source -> flatMap (map & filter) ->
assignTimestampsAndWaterMarks -> map Function -> *process function (print
watermarks) *-> Key By -> Keyed Process Function -> *process function
(print watermarks)* -> Simple Sink

I am seeing similar problems. The watermarks don't seem to advance until a
checkpoint is triggered. I haven't been able to measure if they indeed
advance *regularly *post checkpoint but the watermark at the very least is
no longer Long.MinValue. Also the checkpoint could simply be a red-herring,
I also see some offset commit messages to Kafka around the same time that I
notice watermarks advancing. This is well into a job execution (5-10
minutes)

So now that the Aync I/O is out of the way. Any idea what's going on here?
Related question, how does Flink know that time has "progressed" after a
process function? How is the notion of time carried over given the
developer has full control over what the process function does. Meaning,
the process function could choose to process a "processing time" trigger
for every 5th message it received and then in the processing time trigger
just output a completely unrelated piece of data. How does the notion of
event time then continue to exist.. i.e., how does it know what the event
time is then?

Thank you




On Mon, Oct 11, 2021 at 11:13 PM Arvid Heise  wrote:

> Hi Ahmad,
>
> From your description, I'd look in a different direction: Could it be that
> your Sink/Async IO is not processing data (fast enough)?
> Since you have a bounded watermark strategy, you'd need to see 10s of data
> being processed before the first watermark is emitted.
> To test that, can you please simply remove AsyncIO+Sink from your job and
> check for print statements?
>
> On Tue, Oct 12, 2021 at 3:23 AM Ahmad Alkilani  wrote:
>
>> Flink 1.11
>> I have a simple Flink application that reads from Kafka, uses event
>> timestamps, assigns timestamps and watermarks and then key's by a field and
>> uses a KeyedProcessFunciton.
>>
>> The keyed process function outputs events from with the `processElement`
>> method using `out.collect`. No timers are used to collect or output any
>> elements (or do anything for that matter).
>>
>> I also have a simple print statement that shows event time and waterMark
>> within the process function.
>>
>> if (waterMark <= 0)
>>   println(
>> s"""
>>|eventTimestamp:$eventTimestamp
>>|waterMark: $waterMark
>>|""".stripMargin)
>>
>>
>> If the process function simply does nothing with the incoming records,
>> i.e., does not emit any records/data as a result of an input element, then
>> you'll see the Water Mark start with -Max Long and then progress just fine
>> as expected. If I add `out.collect()` then the watermark stops
>> progressing and the above print statement prints for every record.
>>
>> The environment has
>> `setStreamTimeCharacteristic(TimeCharacteristic.EventTime)` set.
>>
>> The source start out something like this:
>>
>> someKafkaSource.flatMap(_.someTransofmrationToEventType.filter(_.something 
>> != 0))
>>   .assignTimestampsAndWatermarks(WatermarkStrategy
>> .forBoundedOutOfOrderness[Event](Duration.ofSeconds(10))
>> .withIdleness(Duration.ofSeconds(30))
>> .withTimestampAssigner(new SerializableTimestampAssigner[Event] {
>>   override def extractTimestamp(element: Event, recordTimestamp: Long): 
>> Long = {
>> if (element.eventTimeUTCMillis > 0) element.eventTimeUTCMillis else 
>> recordTimestamp
>>   }
>> })
>>
>> The sink is a custom Rich Sink implementation:
>>  resultStream.addSink(new CustomSink()}
>>
>> I recall seeing a thread somewhere indicating this could be a Flink bug
>> but I can't seem to track it down again.
>> Happy to provide more information. For what it's worth, the
>> KeyedProcessFunction used to be a GlobalWindow with a custom Trigger and
>> Evictor but has since been replaced in an attempt to solve the watermark
>> issue with no success so far.
>>
>> Do I have to use assignTimestampAndWatermarks again after the
>> KeyedProcessFunction?
>>
>> Full job flow for completeness:
>>
>> Kafka -> Flink Kafka source -> flatMap (map & filter) ->
>> assignTimestampsAndWaterMarks -> map Function -> Key By -> Keyed Process
>> Function -> Async IO -> Custom Sink
>>
>> Much obliged.
>>
>


Re: Snapshot method for custom keyed state checkpointing ?

2021-10-12 Thread Seth Wiesman
Hi Marc,

I think you will find this is less efficient than just using keyed state.
Remember state backends are local, reading and writing is extremely cheap.
HashMapStateBackend is just an in-memory data structure and
EmbeddedRocksDBStateBackend only works against local disk. Additionally,
the embedded rocksdb state backend already supports incremental
checkpointing, so when an asynchronous checkpoint does occur you are not
paying transfer cost on slow changing state values.

Seth



On Tue, Oct 12, 2021 at 10:12 AM Marc LEGER  wrote:

> Hello Nicolaus,
>
> Unfortunately, I don't really have the hand on the custom state solution
> since it is managed by an existing system which cannot be easily modified.
>
> What I finally did for the "data state" in my CoFlatMapFunction is to use a*
> list-style operator state* to store the partitioned state for a key per
> element in the list with an *union redistribution* scheme in case of
> restore/redistribution.
> Not sure if it's really efficient (need to do more tests) but all
> operators are then receiving the same whole custom state from which the
> partitioned state for the assigned keys can then be retrieved inside every
> operator parallel task besides the other keyed state (control state):
>
>
>
>
> *// Control state partitioned by userId (keyed state) private
> ValueState controlState; // Data state partitioned by userId
> (operator state) private ListState dataState;*
>
> To avoid "state explosion", I also added a custom TTL-based cleanup
> mechanism for this operator state to remove elements in the list which are
> not used for some time.
> However, I am still interested in any other better solution if available
> in Flink.
>
> Thank you for your help.
>
> Best Regards,
> Marc
>
>
> Le mar. 12 oct. 2021 à 09:02, Nicolaus Weidner <
> nicolaus.weid...@ververica.com> a écrit :
>
>> Hi Marc,
>>
>> thanks for clarifying, I had misunderstood some parts.
>> Unfortunately, I don't think there is a way to update keyed state (for
>> multiple keys even) outside of a keyed context.
>>
>> I will ask if someone else has an idea, but allow me to ask one
>> counter-question first: Did you actually run tests to verify that using the
>> custom state solution is more efficient than using Flink's keyed state
>> regularly (in the end, you would even have to include the state
>> synchronization in the performance test)? Efficient stateful stream
>> processing is one of the key features of Flink, and you are essentially
>> trying to override a specific piece of it with custom logic.
>>
>> Best regards,
>> Nico
>>
>> On Wed, Oct 6, 2021 at 5:50 PM Marc LEGER  wrote:
>>
>>> Hello Nicolaus,
>>>
>>> Thank you for your quick feedback, sorry if I am not clear enough.
>>> Actually in the documented example, the state which is updated in the
>>> snapshotState method is an operator state and not a keyed state:
>>>
>>> *public void initializeState(FunctionInitializationContext context)
>>> throws Exception {*
>>>
>>>
>>> *  [...]*
>>>
>>> *  countPerPartition =
>>> context.getOperatorStateStore().getOperatorState(new
>>> ListStateDescriptor<>("perPartitionCount", Long.class));*
>>>
>>>
>>>
>>>
>>> *  [...] } public void snapshotState(FunctionSnapshotContext context)
>>> throws Exception {*
>>>
>>>
>>> *  [...]*
>>>
>>> *  countPerPartition.add(localCount);*
>>>
>>> *}*
>>>
>>>
>>> It seems that the method is then only called once per operator parallel
>>> task and not once per key.
>>> On my side I have two keyed states with same key (e.g., userId) in a
>>> CoFlatMapFunction:
>>>
>>>
>>>
>>>
>>> *// Control state partitioned by userId private ValueState
>>> controlState; // Data state partitioned by userId coming from the
>>> ser/deserialization of a custom system having a partitioned state private
>>> ValueState dataState;*
>>>
>>> and I would like to do something like that to update dataState in a
>>> keyed context for every key and every checkpoint:
>>>
>>>
>>>
>>> *public void snapshotState(FunctionSnapshotContext context) throws
>>> Exception {  dataState.update(customSystem.getSnapshot(context.getKey());
>>> // Not a keyed context here ! }*
>>>
>>> instead of saving dataState in the flatMap2 function for every received
>>> event:
>>>
>>>
>>> *public void flatMap1(Control control, Collector out) {*
>>>
>>> *   controlState.update(control); *
>>>
>>> *}*
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *public void flatMap2(Event event, Collector out) {  //
>>> Perform some event transformations based on controlState  ProcessedEvent
>>> result = customSystem.process(controlState.value() , event);  // Save
>>> internal custom system state after processing: can be costly if high event
>>> throughput
>>> dataState.update(customSystem.getSnapshot(controlState.value().getUserId());
>>> // Output the processed event  out.collect(result); }*
>>>
>>>
>>> So basically, I want to be able to synchronize the partitioned state of
>>> my custom system with the checkpoints done by Flink.
>>>
>>>
>>> Best 

Fwd: How to deserialize Avro enum type in Flink SQL?

2021-10-12 Thread Dongwon Kim
Hi community,

Can I get advice on this question?

Another user just sent me an email asking whether I found a solution or a
workaround for this question, but I'm still stuck there.

Any suggestions?

Thanks in advance,

Dongwon

-- Forwarded message -
From: Dongwon Kim 
Date: Mon, Aug 9, 2021 at 7:26 PM
Subject: How to deserialize Avro enum type in Flink SQL?
To: user 


Hi community,

I have a Kafka topic where the schema of its values is defined by the
"MyRecord" record in the following Avro IDL and registered to the Confluent
Schema Registry.

> @namespace("my.type.avro")
> protocol MyProtocol {
>   enum MyEnumType {
> TypeVal1, TypeVal2
>   }
>   record MyEntry {
> MyEnumType type;
>   }
>   record MyRecord {
> array entries;
>   }
> }


To read from the topic, I've defined the following DDL:

> CREATE TABLE my_table

(
> `entries` ARRAY *`type` ??? (This is the main question)*
> >>
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'my-topic',
> 'properties.bootstrap.servers' = '...:9092',
> 'scan.startup.mode' = 'latest-offset',
> 'value.format' = 'avro-confluent',
> 'value.avro-confluent.schema-registry.url' = 'http://...:8081'
>
)


And I run the following query :

> SELECT * FROM my_table


Now I got the following messages in Flink-1.13.1 when I use *STRING* for
the type:

> *Caused by: java.io.IOException: Failed to deserialize Avro record.*
>   at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
>   at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
>   at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>   at
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
>   at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>   at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>   at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>   at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>   at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>   at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> *Caused by: org.apache.avro.AvroTypeException: Found
> my.type.avro.MyEnumType, expecting union*
>   at
> org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
>   at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
>   at
> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
>   at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>   at
> org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
>   at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
>   at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>   at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>   at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>   at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>   at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>   at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>   at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>   at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>   at
> org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:81)
>   at
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103)
>   ... 9 more

The reason I use the STRING type is just for fast-prototyping.

While reading through [1], I've been thinking about using *RAW('class',
'snapshot')* where 'class' is my.type.avro.MyEnumType, but I'm not sure
whether it is a good idea and if so, what can be a value for the snapshot.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#raw

Thanks in advance,

Dongwon


Re: Snapshot method for custom keyed state checkpointing ?

2021-10-12 Thread Marc LEGER
 Hello Nicolaus,

Unfortunately, I don't really have the hand on the custom state solution
since it is managed by an existing system which cannot be easily modified.

What I finally did for the "data state" in my CoFlatMapFunction is to use a*
list-style operator state* to store the partitioned state for a key per
element in the list with an *union redistribution* scheme in case of
restore/redistribution.
Not sure if it's really efficient (need to do more tests) but all operators
are then receiving the same whole custom state from which the partitioned
state for the assigned keys can then be retrieved inside every operator
parallel task besides the other keyed state (control state):




*// Control state partitioned by userId (keyed state) private
ValueState controlState; // Data state partitioned by userId
(operator state) private ListState dataState;*

To avoid "state explosion", I also added a custom TTL-based cleanup
mechanism for this operator state to remove elements in the list which are
not used for some time.
However, I am still interested in any other better solution if available in
Flink.

Thank you for your help.

Best Regards,
Marc


Le mar. 12 oct. 2021 à 09:02, Nicolaus Weidner <
nicolaus.weid...@ververica.com> a écrit :

> Hi Marc,
>
> thanks for clarifying, I had misunderstood some parts.
> Unfortunately, I don't think there is a way to update keyed state (for
> multiple keys even) outside of a keyed context.
>
> I will ask if someone else has an idea, but allow me to ask one
> counter-question first: Did you actually run tests to verify that using the
> custom state solution is more efficient than using Flink's keyed state
> regularly (in the end, you would even have to include the state
> synchronization in the performance test)? Efficient stateful stream
> processing is one of the key features of Flink, and you are essentially
> trying to override a specific piece of it with custom logic.
>
> Best regards,
> Nico
>
> On Wed, Oct 6, 2021 at 5:50 PM Marc LEGER  wrote:
>
>> Hello Nicolaus,
>>
>> Thank you for your quick feedback, sorry if I am not clear enough.
>> Actually in the documented example, the state which is updated in the
>> snapshotState method is an operator state and not a keyed state:
>>
>> *public void initializeState(FunctionInitializationContext context)
>> throws Exception {*
>>
>>
>> *  [...]*
>>
>> *  countPerPartition =
>> context.getOperatorStateStore().getOperatorState(new
>> ListStateDescriptor<>("perPartitionCount", Long.class));*
>>
>>
>>
>>
>> *  [...] } public void snapshotState(FunctionSnapshotContext context)
>> throws Exception {*
>>
>>
>> *  [...]*
>>
>> *  countPerPartition.add(localCount);*
>>
>> *}*
>>
>>
>> It seems that the method is then only called once per operator parallel
>> task and not once per key.
>> On my side I have two keyed states with same key (e.g., userId) in a
>> CoFlatMapFunction:
>>
>>
>>
>>
>> *// Control state partitioned by userId private ValueState
>> controlState; // Data state partitioned by userId coming from the
>> ser/deserialization of a custom system having a partitioned state private
>> ValueState dataState;*
>>
>> and I would like to do something like that to update dataState in a keyed
>> context for every key and every checkpoint:
>>
>>
>>
>> *public void snapshotState(FunctionSnapshotContext context) throws
>> Exception {  dataState.update(customSystem.getSnapshot(context.getKey());
>> // Not a keyed context here ! }*
>>
>> instead of saving dataState in the flatMap2 function for every received
>> event:
>>
>>
>> *public void flatMap1(Control control, Collector out) {*
>>
>> *   controlState.update(control); *
>>
>> *}*
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *public void flatMap2(Event event, Collector out) {  //
>> Perform some event transformations based on controlState  ProcessedEvent
>> result = customSystem.process(controlState.value() , event);  // Save
>> internal custom system state after processing: can be costly if high event
>> throughput
>> dataState.update(customSystem.getSnapshot(controlState.value().getUserId());
>> // Output the processed event  out.collect(result); }*
>>
>>
>> So basically, I want to be able to synchronize the partitioned state of
>> my custom system with the checkpoints done by Flink.
>>
>>
>> Best Regards,
>> Marc
>>
>> Le mer. 6 oct. 2021 à 12:11, Nicolaus Weidner <
>> nicolaus.weid...@ververica.com> a écrit :
>>
>>> Hi Marc,
>>>
>>> I think you can just use keyed state in a
>>> CheckpointedFunction. FunctionInitializationContext gives you access to
>>> both keyed state and operator state (your stream needs to be keyed, of
>>> course). So you could just update your local custom state on regular
>>> invocations and update keyed state on calls to snapshotState.
>>> Check out the example in [1] where both types of state are used.
>>>
>>> Does that help? Not sure if I understood the problem correctly.
>>>
>>> Best regards,
>>> Nico
>>>
>>> [1]
>>> 

[no subject]

2021-10-12 Thread Andrew Otto
Hello,

I'm trying to use HiveCatalog with Kerberos.  Our Hadoop cluster, our Hive
Metastore, and our Hive Server are kerberized.  I can successfully submit
Flink jobs to Yarn authenticated as my users using a cached ticket, as well
as using a keytab.

However, I can't seem to register a HiveCatalog with my TableEnvironment.
Here's my code:

import org.apache.flink.table.api._
import org.apache.flink.table.catalog.hive.HiveCatalog

val settings = EnvironmentSettings.newInstance().inStreamingMode().build()
val tableEnv = TableEnvironment.create(settings)
val catalog = new HiveCatalog("analytics_hive", "flink_test", "/etc/hive/conf")
tableEnv.registerCatalog("analytics_hive", catalog)


Which causes an exception:
Caused by: java.lang.reflect.InvocationTargetException:
org.apache.hadoop.hive.metastore.api.MetaException: Could not connect to
meta store using any of the URIs provided. Most recent failure:
org.apache.thrift.transport.TTransportException: GSS initiate failed

(Full stacktrace here
.)

The same error happens if I try to submit this job using my cached kerberos
ticket, or with a keytab.
I have also tried wrapping the HiveCatalog in a Hadoop UserGroupInformation
PrivilegedExceptionAction as described here
 and got the
same result (no real idea what I'm doing here, just trying some things.)

Is there something more I have to do to use HiveCatalog with a kerberized
Hive Metastore?  Should Flink support this out of the box?

Thanks!
- Andrew Otto
  SRE, Wikimedia Foundation


Exception: SequenceNumber is treated as a generic type

2021-10-12 Thread Ori Popowski
Hi,

I have a large backpressure in a somewhat simple Flink application in
Scala. Using Flink version 1.12.1.

To find the source of the problem, I want to eliminate all classes with
generic serialization, so I set
pipeline.generic-types=false

in order to spot those classes and write a serializer for them.

However, for some reason, I get the stracktrace attached below.

   1. It looks suspicious that one of Flink's own classes doesn't have a
   serializer and should fallback to generic serialization. Is this a bug?
   2. I want to get a list of all classes which fallback to generic
   serialization. How can I do it other than setting
   pipeline.generic-types=false and eliminating those classes one by one?
   3. I defined a custom Kryo serializer for this class using both
   addDefaultKryoSerializer(…) and registerTypeWithKryoSerializer(…) and I
   still get the same error message. How can I provide Flink with custom
   serialization so it stops complaining about this?



java.lang.UnsupportedOperationException: Generic types have been disabled
in the ExecutionConfig and type
org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber is
treated as a generic type.
at
org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:87)
at
org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:104)
at
org.apache.flink.api.java.typeutils.TupleTypeInfo.createSerializer(TupleTypeInfo.java:49)
at
org.apache.flink.api.java.typeutils.ListTypeInfo.createSerializer(ListTypeInfo.java:99)
at
org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:302)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.getListState(DefaultOperatorStateBackend.java:264)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend.getUnionListState(DefaultOperatorStateBackend.java:216)
at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.initializeState(FlinkKinesisConsumer.java:443)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
at java.lang.Thread.run(Thread.java:748)


Re: Reset of transient variables in state to default values.

2021-10-12 Thread Alex Drobinsky
Hi Jing,

Job doesn't restart from the checkpoint, it's a brand new clean job , no
exceptions happened during execution, no restarts :)
The state is a Keyed State so a new key means a new State - in this
situation a currentFile is equal to null - as expected and handled without
issues.
Before I even thought to inquire about my questions, the first thing I did
- I added log messages with the value of currentFile in any place it could
be changed.
So I checked that before I release my control to Flink, currentFile has the
correct value and after I receive value from state in the next iteration
it's set to null.
The checkpoints by themselves could be irrelevant to the problem, the only
indication of connection is my assumption based on observation that the
interval between first event and first occurrence of nullification is
exactly the same as the checkpoint interval.

Yun Tang - you are correct, it's a KryoSerializer, if I remove the
"transient" qualifier from currentFile, it crashes inside of KryoSerializer
because RandomAccessFile isn't serializable.
Which also points to the fact that at least once serialization was actually
executed. I will try an alternative approach - I will add my own
writeObject implementation, it should work :)

Best regards,
Alex






вт, 12 окт. 2021 г. в 15:07, JING ZHANG :

> Hi Alex,
> Since you use `FileSystemStateBackend`, I think currentFile became
> nullified once in a while is not caused by period checkpoint.
>
> Because if job is running without failover or restore from checkpoint,
> read/write value state on `FileSystemStateBackend` does not cause
> serialization and deserialization at all. I have already simplify your
> coding and verify this point. If you are interested, you can find this
> simplified code in the attachment of the email.
>
> There are some possible reasons come to my mind, I hope this helps.
> 1. Does job restore from checkpoint/savepoint? This may caused by failover
> or user trigger stop-with-savepoint.
> 2. If job does not restore from checkpoint or savepoint.
>  2.1 After read the MultiStorePacketState from ValueState, is there
> somewhere in your program to update the currentFile field to null again?
> Because the state stored in heap,  it may be changed if program updates its
> value somewhere.
>  2.2 When the currentFile became null, is there any possible that
> current key never appear before? that is it's the first time that the
> current key appears, so get state would return default value(a new
> MultiStorePacketState instance with null currentFile)
>
> Best,
> JING ZHANG
>
> Yun Tang  于2021年10月12日周二 下午4:41写道:
>
>> Hi Alex,
>>
>> Since you use customized MultiStorePacketState class as the value state
>> type, it should use kryo serializer [1] to serialize your class via
>> accessing RocksDB state or checkpoint via FileSystemStateBackend, and I
>> don't know whether Kryo would serialize your transient field.
>> If you're not familiar with Flink's serialization stack, I think you
>> could check behaviors below:
>>
>>1. Without any checkpoint restore, use FileSystemStateBackend to see
>>whether the transient field could be read as expected, the answer should 
>> be
>>yes.
>>2. After restoring from checkpoint, check whether could read the
>>transient field back if using FileSystemStateBackend.
>>
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/serialization/types_serialization/#flinks-typeinformation-class
>>
>> Best
>> Yun Tang
>>
>>
>> --
>> *From:* Alex Drobinsky 
>> *Sent:* Monday, October 11, 2021 22:37
>> *To:* JING ZHANG 
>> *Cc:* User-Flink 
>> *Subject:* Re: Reset of transient variables in state to default values.
>>
>> It would be difficult to provide even a semblance of the complete product
>> , however I could try to provide enough details to reproduce the problem.
>> Standard source would do:
>>
>> DataStream stream = env.addSource(
>> new FlinkKafkaConsumer<>(topic, new 
>> AbstractDeserializationSchema() {
>> @Override
>> public byte[] deserialize(byte[] bytes) throws IOException {
>> return bytes;
>> }
>> }, properties)).name(topic);
>>
>>
>> The operator body something like:
>>
>> public class MultiStorePacketFunction extends KeyedProcessFunction> SplitterToMultiStore, ClassifierOutput> implements Serializable {
>>private transient ValueState state;
>>
>>@Override
>>public void processElement(SplitterToMultiStore packet, Context ctx, 
>> Collector out) throws Exception {
>>   if (packet.hasPackets()) {
>>  storedPackets.inc(packet.getPackets().getPacketsCount());
>>   }
>>
>>   MultiStorePacketState so = state.value();
>>   if (process(packet, out, so, ctx)) {
>>  state.update(null);
>>  state.clear();
>>   } else {
>>  state.update(so);
>>   }
>>}
>>
>> public String 

Re: Reset of transient variables in state to default values.

2021-10-12 Thread JING ZHANG
Hi Alex,
Since you use `FileSystemStateBackend`, I think currentFile became
nullified once in a while is not caused by period checkpoint.

Because if job is running without failover or restore from checkpoint,
read/write value state on `FileSystemStateBackend` does not cause
serialization and deserialization at all. I have already simplify your
coding and verify this point. If you are interested, you can find this
simplified code in the attachment of the email.

There are some possible reasons come to my mind, I hope this helps.
1. Does job restore from checkpoint/savepoint? This may caused by failover
or user trigger stop-with-savepoint.
2. If job does not restore from checkpoint or savepoint.
 2.1 After read the MultiStorePacketState from ValueState, is there
somewhere in your program to update the currentFile field to null again?
Because the state stored in heap,  it may be changed if program updates its
value somewhere.
 2.2 When the currentFile became null, is there any possible that
current key never appear before? that is it's the first time that the
current key appears, so get state would return default value(a new
MultiStorePacketState instance with null currentFile)

Best,
JING ZHANG

Yun Tang  于2021年10月12日周二 下午4:41写道:

> Hi Alex,
>
> Since you use customized MultiStorePacketState class as the value state
> type, it should use kryo serializer [1] to serialize your class via
> accessing RocksDB state or checkpoint via FileSystemStateBackend, and I
> don't know whether Kryo would serialize your transient field.
> If you're not familiar with Flink's serialization stack, I think you could
> check behaviors below:
>
>1. Without any checkpoint restore, use FileSystemStateBackend to see
>whether the transient field could be read as expected, the answer should be
>yes.
>2. After restoring from checkpoint, check whether could read the
>transient field back if using FileSystemStateBackend.
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/serialization/types_serialization/#flinks-typeinformation-class
>
> Best
> Yun Tang
>
>
> --
> *From:* Alex Drobinsky 
> *Sent:* Monday, October 11, 2021 22:37
> *To:* JING ZHANG 
> *Cc:* User-Flink 
> *Subject:* Re: Reset of transient variables in state to default values.
>
> It would be difficult to provide even a semblance of the complete product
> , however I could try to provide enough details to reproduce the problem.
> Standard source would do:
>
> DataStream stream = env.addSource(
> new FlinkKafkaConsumer<>(topic, new 
> AbstractDeserializationSchema() {
> @Override
> public byte[] deserialize(byte[] bytes) throws IOException {
> return bytes;
> }
> }, properties)).name(topic);
>
>
> The operator body something like:
>
> public class MultiStorePacketFunction extends KeyedProcessFunction SplitterToMultiStore, ClassifierOutput> implements Serializable {
>private transient ValueState state;
>
>@Override
>public void processElement(SplitterToMultiStore packet, Context ctx, 
> Collector out) throws Exception {
>   if (packet.hasPackets()) {
>  storedPackets.inc(packet.getPackets().getPacketsCount());
>   }
>
>   MultiStorePacketState so = state.value();
>   if (process(packet, out, so, ctx)) {
>  state.update(null);
>  state.clear();
>   } else {
>  state.update(so);
>   }
>}
>
> public String generateNextFilename(String sessionKey, int partNumber) {
>   String path = DirectoryService.getInstance().bookDirectory();
>   return path + File.separator + sessionKey + "-" + partNumber + ".pcap";
>}
>
>private void storeContent(Collector collector, 
> MultiStorePacketState state, SplitterToMultiStore packets) throws Exception {
>   assert (packets != null);
>   assert (packets.hasPackets());
>
>   if ( state.currentFile == null) {
>  openFile(collector, state, packets);
>   }
>
>   Utils.continueWriteToPcap(state.currentFile, 
> packets.getPackets().getPacketsList());
>   state.fileOffset = state.currentFile.length();
>
>   tryToCloseFile(collector, state);
>}
>
>static public String extractExportedFileName(String fileName) {
>   String path[] = fileName.split("/");
>   return path[path.length - 2] + "/" + path[path.length - 1];
>}
>
>private void openFile(Collector collector, 
> MultiStorePacketState state, SplitterToMultiStore packets) throws Exception {
>   state.fileIsOpened = true;
>   state.fileName = generateNextFilename(state.sessionKey, 
> state.partNumber);
>   state.exportedFileName = extractExportedFileName(state.fileName);
>
> // -> Here RandomAccessFile created
>   state.currentFile = Utils.startWriteToPcap(state.fileName, 
> packets.getPackets().getPacketsList());
>   state.fileOffset = state.currentFile.length();
>   

Re: Inconsistent parallelism in web UI when using reactive mode

2021-10-12 Thread 陳昌倬
On Tue, Oct 12, 2021 at 10:41:24AM +0200, Chesnay Schepler wrote:
> This is a known and documented 
> 
> limitation of the AdaptiveScheduler. There is no concrete date yet for when
> it will be fixed.

Thanks for the information.


-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Re: how to view doc of flink-1.10 in Chinese

2021-10-12 Thread Chesnay Schepler

https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/

Do note that certain pages haven't been translated.

On 09/10/2021 05:08, 杨浩 wrote:

Our company use release-1.10,can we see the zh doc?

English Doc:https://ci.apache.org/projects/flink/flink-docs-release-1.10/
Chinese Doc(only view 
latest):https://flink.apache.org/zh/flink-architecture.html





Re: Reset of transient variables in state to default values.

2021-10-12 Thread Yun Tang
Hi Alex,

Since you use customized MultiStorePacketState class as the value state type, 
it should use kryo serializer [1] to serialize your class via accessing RocksDB 
state or checkpoint via FileSystemStateBackend, and I don't know whether Kryo 
would serialize your transient field.
If you're not familiar with Flink's serialization stack, I think you could 
check behaviors below:

  1.  Without any checkpoint restore, use FileSystemStateBackend to see whether 
the transient field could be read as expected, the answer should be yes.
  2.  After restoring from checkpoint, check whether could read the transient 
field back if using FileSystemStateBackend.


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/serialization/types_serialization/#flinks-typeinformation-class

Best
Yun Tang



From: Alex Drobinsky 
Sent: Monday, October 11, 2021 22:37
To: JING ZHANG 
Cc: User-Flink 
Subject: Re: Reset of transient variables in state to default values.

It would be difficult to provide even a semblance of the complete product , 
however I could try to provide enough details to reproduce the problem.
Standard source would do:

DataStream stream = env.addSource(
new FlinkKafkaConsumer<>(topic, new 
AbstractDeserializationSchema() {
@Override
public byte[] deserialize(byte[] bytes) throws IOException {
return bytes;
}
}, properties)).name(topic);

The operator body something like:


public class MultiStorePacketFunction extends KeyedProcessFunction implements Serializable {
   private transient ValueState state;

   @Override
   public void processElement(SplitterToMultiStore packet, Context ctx, 
Collector out) throws Exception {
  if (packet.hasPackets()) {
 storedPackets.inc(packet.getPackets().getPacketsCount());
  }

  MultiStorePacketState so = state.value();
  if (process(packet, out, so, ctx)) {
 state.update(null);
 state.clear();
  } else {
 state.update(so);
  }
   }

public String generateNextFilename(String sessionKey, int partNumber) {
  String path = DirectoryService.getInstance().bookDirectory();
  return path + File.separator + sessionKey + "-" + partNumber + ".pcap";
   }

   private void storeContent(Collector collector, 
MultiStorePacketState state, SplitterToMultiStore packets) throws Exception {
  assert (packets != null);
  assert (packets.hasPackets());

  if ( state.currentFile == null) {
 openFile(collector, state, packets);
  }

  Utils.continueWriteToPcap(state.currentFile, 
packets.getPackets().getPacketsList());
  state.fileOffset = state.currentFile.length();

  tryToCloseFile(collector, state);
   }

   static public String extractExportedFileName(String fileName) {
  String path[] = fileName.split("/");
  return path[path.length - 2] + "/" + path[path.length - 1];
   }

   private void openFile(Collector collector, 
MultiStorePacketState state, SplitterToMultiStore packets) throws Exception {
  state.fileIsOpened = true;
  state.fileName = generateNextFilename(state.sessionKey, state.partNumber);
  state.exportedFileName = extractExportedFileName(state.fileName);

// -> Here RandomAccessFile created
  state.currentFile = Utils.startWriteToPcap(state.fileName, 
packets.getPackets().getPacketsList());
  state.fileOffset = state.currentFile.length();
  state.partNumber++;
   }

   private void tryToCloseFile(Collector collector, 
MultiStorePacketState state) throws IOException {
  if (state.currentFile.length() < 
StorePacketConfigurationParameters.partSizeLimit) {
 return;
  }
  closeFile(collector, state);
   }

   private void closeFile(Collector collector, 
MultiStorePacketState state) throws IOException {
  state.currentFile.close();
  state.currentFile = null;
  state.fileIsOpened = false;
  ClassifierOutput.Builder outputBuilder = ClassifierOutput.newBuilder();
  outputBuilder.getUsualBuilder().setFileName(state.exportedFileName);
  outputBuilder.setSessionType(SessionType.Multi);
  outputBuilder.setSessionKey(state.sessionKey);
  var classifierOutput = outputBuilder.build();
  state.sessionMetadata.add(classifierOutput);
  collector.collect(classifierOutput);
   }

public boolean process(SplitterToMultiStore packet, 
Collector collector, MultiStorePacketState so, Context 
context) throws Exception {

  // First message
  if (packet.hasClassificationResult()) {
 sendClassificationResult(packet, collector, so);
 return false;
  }

  // Last message
  if (packet.hasSessionClosure()) {
 if (so.isCoverageIncorrect) {
return true;
 }
 handleSessionClosure(packet, collector, so, context);
 return true;
  }

  if (so.isCoverageIncorrect) {
 return false;
  }
  

Re: Inconsistent parallelism in web UI when using reactive mode

2021-10-12 Thread Chesnay Schepler
This is a known and documented 
 
limitation of the AdaptiveScheduler. There is no concrete date yet for 
when it will be fixed.


On 12/10/2021 05:08, ChangZhuo Chen (陳昌倬) wrote:

Hi,

We found that parallelism in web UI are inconsistent when using reactive
mode. As in attachment, in overview page, all parallelism values are 1,
which is not correct one. When clicking operator for detail information,
the parallelism in detail information is the correct one.

Is it possible to fix this inconsistent so that it would not confused
engineer when deploying Flink application.




关于slot分配

2021-10-12 Thread yidan zhao
我以前采用分配到多个TM的机制,最近尝试了下默认的优先单个TM的机制。
但是发现个问题,我当前每个TM是10个slot,我有个任务40并发,然后实际占用了5个TM,10+10+10+2+8。这个是啥情况呢?

我更期望要么就彻底平均(配置spread那个参数),要么就单个TM这样用。
前者:期望机器之间均衡。
后者:期间任务之间完全隔离,我的任务并发会设置单TMslot数量(10)的倍数。


Re: Checkpoint size increasing even i enable increasemental checkpoint

2021-10-12 Thread Yun Tang
Hi Lei,

RocksDB state-backend's checkpoint is composited by RocksDB's own files 
(unmodified compressed SST format files) and incremental checkpoints means 
Flink does not upload files which were uploaded before. As you can see, 
incremental checkpoints highly depend on the RocksDB's own mechanism to remove 
useless files, which is triggered by internal compaction. You should not care 
too much on the checkpointed data size as your job consuming more and more 
records, moreover the increasing size is actually quite small (from 1.32GB to 
1.34GB).

Best
Yun Tang




From: Lei Wang 
Sent: Monday, October 11, 2021 16:16
To: user 
Subject: Checkpoint size increasing even i enable increasemental checkpoint


[image.png]

The  checkpointed data size became bigger and bigger and the node cpu is very 
high when the job is doing checkpointing.
 But I have enabled incremental checkpointing:  env.setStateBackend(new 
RocksDBStateBackend(checkpointDir, true));

I am using flink-1.11.2 and aliyun oss as checkpoint storage.


Any insight on this?

Thanks,

Lei



Re: Impossible to get pending file names/paths on checkpoint?

2021-10-12 Thread Fabian Paul
Hi Preston,

I just noticed I forgot to cc to the user mailing list on my first reply …. I 
have a few thoughts about the design you are describing.


> In the meantime I have a nasty hack in place that has unblocked me for now in 
> getting the target file off the LocalRecoverable/HadoopFsRecoverable:
> 
> InProgressFileWriter.PendingFileRecoverable recoverable = 
> committable.getPendingFile(); 
> RecoverableWriter.CommitRecoverable commitRecoverable = 
> ((OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable) 
> recoverable).getCommitRecoverable();
> 
> Method m = commitRecoverable.getClass().getMethod("targetFile");
> m.setAccessible(true);
> File targetFile = (File) m.invoke(commitRecoverable);
> I think a good place to start would be to introduce getTargetFile, and 
> getTempFile methods on the CommitRecoverable interface, though I haven't 
> fully studied the impact of this approach on other implementations of that 
> interface.

I might miss the context here or lack of knowledge how the Azure Data Lake 
works but why do you need access to the target and/or temp file locations. You 
scenario sounds very similar to any other distributed file system.


> A note on implementing our part-file scoped Encoder: The current Encoder 
> pattern in 1.14 assumes that the same encoder will work for all files, for 
> all time. We had to make numerous small changes to the File Sink to break 
> this pattern, and allow for an Encoder instance per part file. My current 
> solution uses a custom BucketID object with both Path, and EventType 
> properties. In our BucketWriter.openNew method we can use the 
> BucketId.EventType to lookup the Protobuf descriptor we need, create a new 
> Encoder and pass it to our RowWisePartWriter. We had to reimplement/absorb a 
> significant amount of the File Sink code to accomplish this as the File Sink 
> implementation assumes a String for BucketID and there are many roadblocks 
> put in place to prevent extending FileSink functionality.

This is an interesting point. I guess we did not think about such use case when 
developing the sink. Maybe we can approach the problem differently.  
I am thinking about adding a context to the `Encoder#encode` method where 
metadata (new bucket, filename, bucketname) is accessible. Does this help in 
your case?

> A perfect example of these roadblocks is the FileWriterBucketFactory 
> interface. It looks like we could provide our own implementation of this 
> interface, but the return type of it's methods (FileWriterBucket) have 
> default (package protected) visibility and so we can neither provide our own 
> implementation, nor sub-class the return types to add our own logic. Another 
> example is the OutputStreamBasedPartFileWriter which wraps a default (package 
> protected) visibility abstract class (OutputStreamBasedBucketWriter). I ran 
> into numerous issues like these.

In general, all classes annotated with @Internal are not meant to be used 
outside of  Flink but I agree sometimes it becomes necessary. Although if more 
and more people need to reimplement big parts of the FlieSink we have to 
incorporate that feedback make it extensible.

> 
> A note on implementing our Azure Data Explorer sink: Currently we're looking 
> to add code in a custom Committer to do this. However, since I can't grok a 
> way to make the file commit + ADX ingest command atomic we need to (re)ingest 
> all pending files since there's no way to track what's already been 
> committed+ingested. For now we're hoping to make ADX (re)ingestion idempotent 
> using ingest-by tags, but we may have to use something else (maybe memcache) 
> to track what's already been ingested if ingest-by tagging doesn't scale. A 
> post-file-commit pipeline like you've mentioned would work for us provided 
> that we could still get exactly-once guarantees on the operators of that 
> post-file-commit pipeline. If there's a better way to solve this part of our 
> use case in 1.14 I'd love to hear it :)

Unfortunately I have limited knowledge about ADX. I briefly looked it up and it 
seems to have two modes batch and streaming ingestion. Which of both are you 
currently using? 

Usually the idempotency is guaranteed by the underlying 
RecoverableFsDataOutputStream if the current implementations do not suffice I 
can imagine adding for the azure.

The post-file-commit pipeline might be just a normal dataStream where users can 
consume the committables after they have been committed. So the exactly-once 
guarantee needs to be implemented by the user. 

Best,
Fabian



Re: Snapshot method for custom keyed state checkpointing ?

2021-10-12 Thread Nicolaus Weidner
Hi Marc,

thanks for clarifying, I had misunderstood some parts.
Unfortunately, I don't think there is a way to update keyed state (for
multiple keys even) outside of a keyed context.

I will ask if someone else has an idea, but allow me to ask one
counter-question first: Did you actually run tests to verify that using the
custom state solution is more efficient than using Flink's keyed state
regularly (in the end, you would even have to include the state
synchronization in the performance test)? Efficient stateful stream
processing is one of the key features of Flink, and you are essentially
trying to override a specific piece of it with custom logic.

Best regards,
Nico

On Wed, Oct 6, 2021 at 5:50 PM Marc LEGER  wrote:

> Hello Nicolaus,
>
> Thank you for your quick feedback, sorry if I am not clear enough.
> Actually in the documented example, the state which is updated in the
> snapshotState method is an operator state and not a keyed state:
>
> *public void initializeState(FunctionInitializationContext context) throws
> Exception {*
>
>
> *  [...]*
>
> *  countPerPartition =
> context.getOperatorStateStore().getOperatorState(new
> ListStateDescriptor<>("perPartitionCount", Long.class));*
>
>
>
>
> *  [...] } public void snapshotState(FunctionSnapshotContext context)
> throws Exception {*
>
>
> *  [...]*
>
> *  countPerPartition.add(localCount);*
>
> *}*
>
>
> It seems that the method is then only called once per operator parallel
> task and not once per key.
> On my side I have two keyed states with same key (e.g., userId) in a
> CoFlatMapFunction:
>
>
>
>
> *// Control state partitioned by userId private ValueState
> controlState; // Data state partitioned by userId coming from the
> ser/deserialization of a custom system having a partitioned state private
> ValueState dataState;*
>
> and I would like to do something like that to update dataState in a keyed
> context for every key and every checkpoint:
>
>
>
> *public void snapshotState(FunctionSnapshotContext context) throws
> Exception {  dataState.update(customSystem.getSnapshot(context.getKey());
> // Not a keyed context here ! }*
>
> instead of saving dataState in the flatMap2 function for every received
> event:
>
>
> *public void flatMap1(Control control, Collector out) {*
>
> *   controlState.update(control); *
>
> *}*
>
>
>
>
>
>
>
>
>
> *public void flatMap2(Event event, Collector out) {  //
> Perform some event transformations based on controlState  ProcessedEvent
> result = customSystem.process(controlState.value() , event);  // Save
> internal custom system state after processing: can be costly if high event
> throughput
> dataState.update(customSystem.getSnapshot(controlState.value().getUserId());
> // Output the processed event  out.collect(result); }*
>
>
> So basically, I want to be able to synchronize the partitioned state of my
> custom system with the checkpoints done by Flink.
>
>
> Best Regards,
> Marc
>
> Le mer. 6 oct. 2021 à 12:11, Nicolaus Weidner <
> nicolaus.weid...@ververica.com> a écrit :
>
>> Hi Marc,
>>
>> I think you can just use keyed state in a
>> CheckpointedFunction. FunctionInitializationContext gives you access to
>> both keyed state and operator state (your stream needs to be keyed, of
>> course). So you could just update your local custom state on regular
>> invocations and update keyed state on calls to snapshotState.
>> Check out the example in [1] where both types of state are used.
>>
>> Does that help? Not sure if I understood the problem correctly.
>>
>> Best regards,
>> Nico
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java#L74-L110
>>
>> On Tue, Oct 5, 2021 at 3:28 PM Marc LEGER  wrote:
>>
>>> Hello,
>>>
>>> Is there any method available in a RichFunction to be called by Flink
>>> with a keyed context each time a checkpoint is triggered please ?
>>>
>>> It seems that the CheckpointedFunction interface provides such a feature
>>> (snapshotState method) but only in case of operator state and it is called
>>> in a non-keyed context.
>>>
>>> Indeed, I am implementing a CoFlatMapFunction with:
>>> - a keyed state (state1) for a "control" stream (stream1) which is not
>>> often updated,
>>> - a keyed state (state2) for a "data" stream (stream2) with a high
>>> throughput and relying on a custom solution for internal state snapshot
>>> with some potential performance impact.
>>>
>>> Consequently, I don't want to trigger a state2 update for every event
>>> received in stream2 for efficiency reasons but rather update state2 based
>>> on checkpoints triggered by Flink.
>>>
>>> Best Regards,
>>> Marc
>>>
>>>


Re: Custom Sink Object attribute issue

2021-10-12 Thread Arvid Heise
Hi Jigar,

I'm moving your user question to the user ML.

The best place to initialize transient fields is in

 private void readObject(java.io.ObjectInputStream in)
 throws IOException, ClassNotFoundException;

as described in [1]:

Remember that transient fields will be initialized to their default values.
> You can provide a readObject method that restores transient fields to
> acceptable values, or alternatively, lazily initialize those fields first
> time they are used.

[1] Effective Java, Item 87 : Consider using a custom serialized form;
https://ahdak.github.io/blog/effective-java-part-11/


On Mon, Oct 11, 2021 at 10:56 AM Till Rohrmann  wrote:

> Hi Jigar,
>
> in order to run the Sink function on the Flink cluster, it will be
> serialized. Since you marked the repository as transient, it won't be
> shipped to the cluster. So if Repository is Serializable, you can ship it
> to the cluster. If not, then you need to reconstruct the Repository on the
> cluster (e.g. on the first invoke call or the open call on the
> RichSinkFunction).
>
> Cheers,
> Till
>
> On Mon, Oct 11, 2021 at 10:12 AM Jigar Gajjar 
> wrote:
>
> > Hello Devs,
> >
> >
> > Here is my custom sink code.
> >
> > `
> >
> > public class FlinkNeptuneSink extends RichSinkFunction {
> > static HttpClient client = HttpClient.newHttpClient();
> > private static final long serialVersionUID = 1L;
> > NeptuneClientFactory neptuneClientFactory;
> > JsonLDWriteContext jsonLDWriteContext;
> > String baseURI;
> > Map contextJsonMap;
> > String namespaceURI;
> >
> > public FlinkNeptuneSink(String protocol, String host, String port,
> > String baseURI, Map contextJsonMap, String namespaceURI) {
> > neptuneClientFactory = new NeptuneClientFactory(protocol, host,
> > port);
> >
> > this.baseURI = baseURI;
> > this.contextJsonMap = contextJsonMap;
> > this.namespaceURI = namespaceURI;
> > }
> >
> > @Override
> > public void invoke(IN value, Context context) throws IOException {
> > //neptuneClientFactory.getNeptuneClient()   (repository  attribute in
> > neptuneClientFactory   is null)
> > try (RepositoryConnection conn =
> > neptuneClientFactory.getNeptuneClient().getConnection())  {
> > }
> >
> > }
> > }
> >
> > public class NeptuneClientFactory implements Serializable {
> > private transient Repository repository;
> >
> > public NeptuneClientFactory(String protocol, String host, String
> > port) {
> > this.repository = createNeptuneClient(protocol, host, port);
> > }
> >
> > public static Repository createNeptuneClient(String protocol, String
> > host, String port) {
> > String sparqlEndpoint = String.format("%s://%s:%s/sparql",
> > protocol, host, port);
> > Repository repo = new SPARQLRepository(sparqlEndpoint);
> > repo.init();
> > return repo;
> > }
> >
> > public Repository getNeptuneClient() {
> > return repository;
> > }
> > }
> >
> >
> > filtredNonEmptyP5.addSink(new FlinkNeptuneSink<>("https", "neptunehost",
> > "8182", "https://localhost/entity;, contextJsonMap, "
> > https://localhost/namespaces/default;));
> >
> > `
> >
> > when  it invokes method then only neptuneClientFactory has a repository
> > value as null. not sure why, it has other attributes values properly set.
> >
> > Is flink initializing sink attributes from somewhere else?
> > When I debug  then while creating sink  it
> > initializes neptuneClientFactory  properly but when it comes to invoke
> > method then the repository is blank.
> >
> > Please help.
> >
> > --
> > Thanks
> > Jigar Gajjar
> >
>


??????flink-1.14 ???? kafkasource ????watermark????

2021-10-12 Thread kcz
??globalWindowtriggertimes.public
 class PathMonitorJob {
private static final String PATH = "path";
private static double THRESHOLD;
public static void main(String[] args) throws Exception {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
THRESHOLD = parameterTool.getDouble("threshold",1000d);
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource

Re: Flink 1.11 loses track of event timestamps and watermarks after process function

2021-10-12 Thread Arvid Heise
Hi Ahmad,

>From your description, I'd look in a different direction: Could it be that
your Sink/Async IO is not processing data (fast enough)?
Since you have a bounded watermark strategy, you'd need to see 10s of data
being processed before the first watermark is emitted.
To test that, can you please simply remove AsyncIO+Sink from your job and
check for print statements?

On Tue, Oct 12, 2021 at 3:23 AM Ahmad Alkilani  wrote:

> Flink 1.11
> I have a simple Flink application that reads from Kafka, uses event
> timestamps, assigns timestamps and watermarks and then key's by a field and
> uses a KeyedProcessFunciton.
>
> The keyed process function outputs events from with the `processElement`
> method using `out.collect`. No timers are used to collect or output any
> elements (or do anything for that matter).
>
> I also have a simple print statement that shows event time and waterMark
> within the process function.
>
> if (waterMark <= 0)
>   println(
> s"""
>|eventTimestamp:$eventTimestamp
>|waterMark: $waterMark
>|""".stripMargin)
>
>
> If the process function simply does nothing with the incoming records,
> i.e., does not emit any records/data as a result of an input element, then
> you'll see the Water Mark start with -Max Long and then progress just fine
> as expected. If I add `out.collect()` then the watermark stops
> progressing and the above print statement prints for every record.
>
> The environment has
> `setStreamTimeCharacteristic(TimeCharacteristic.EventTime)` set.
>
> The source start out something like this:
>
> someKafkaSource.flatMap(_.someTransofmrationToEventType.filter(_.something != 
> 0))
>   .assignTimestampsAndWatermarks(WatermarkStrategy
> .forBoundedOutOfOrderness[Event](Duration.ofSeconds(10))
> .withIdleness(Duration.ofSeconds(30))
> .withTimestampAssigner(new SerializableTimestampAssigner[Event] {
>   override def extractTimestamp(element: Event, recordTimestamp: Long): 
> Long = {
> if (element.eventTimeUTCMillis > 0) element.eventTimeUTCMillis else 
> recordTimestamp
>   }
> })
>
> The sink is a custom Rich Sink implementation:
>  resultStream.addSink(new CustomSink()}
>
> I recall seeing a thread somewhere indicating this could be a Flink bug
> but I can't seem to track it down again.
> Happy to provide more information. For what it's worth, the
> KeyedProcessFunction used to be a GlobalWindow with a custom Trigger and
> Evictor but has since been replaced in an attempt to solve the watermark
> issue with no success so far.
>
> Do I have to use assignTimestampAndWatermarks again after the
> KeyedProcessFunction?
>
> Full job flow for completeness:
>
> Kafka -> Flink Kafka source -> flatMap (map & filter) ->
> assignTimestampsAndWaterMarks -> map Function -> Key By -> Keyed Process
> Function -> Async IO -> Custom Sink
>
> Much obliged.
>