Re: 是否可以 hive 流 join hive 流?

2021-10-26 Thread yidan zhao
请问,hive表不支持watermark,是不是和window tvf不支持batch也有关系?
当前hive表如果要分窗口统计是不是不可以用window tvf,是否也是因为hive表不支持time
attribute(eventtime+watermark)的原因。

Leonard Xu  于2021年2月1日周一 下午2:24写道:

> 还没有,你可以关注下这个issue[1]
>
> 祝好,
> Leonard
> [1] https://issues.apache.org/jira/browse/FLINK-21183
>
> > 在 2021年2月1日,13:29,macdoor  写道:
> >
> > 当前的 1.13-snapshot 支持了吗?我可以试试吗?
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>
>


Re: s3 access denied with flink-s3-fs-presto

2021-10-26 Thread Parag Somani
Hello,

I have successfully been able to store data on S3 bucket. Earlier, I used
to have a similar issue. What you need to confirm:
1. S3 bucket is created with RW access(irrespective if it is minio or AWS
S3)
2. "flink/opt/flink-s3-fs-presto-1.14.0.jar" jar is copied to plugin
directory of "flink/plugins/s3-fs-presto"
3. Add following configuration in config(configuration or programmatically,
either way)

state.checkpoints.dir: 
state.backend.fs.checkpointdir: 
s3.path-style: true
s3.path.style.access: true

On Wed, Oct 27, 2021 at 2:47 AM Vamshi G  wrote:

> s3a with hadoop s3 filesystem works fine for us wit sts assume role
> credentials and with kms.
> Below are how our hadoop s3a configs look like. Since the endpoint is
> globally whitelisted, we don't explicitly mention the endpoint.
>
> fs.s3a.aws.credentials.provider: 
> org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider
> fs.s3a.assumed.role.credentials.provider: 
> com.amazonaws.auth.profile.ProfileCredentialsProvider
> fs.s3a.assumed.role.arn: arn:aws:iam:::role/
> fs.s3a.server-side-encryption-algorithm: SSE-KMS
> fs.s3a.server-side-encryption.key: 
> arn:aws:kms:::key/
>
>
> However, for checkpointing we definitely want to use presto s3, and just
> could not make it work. FINE logging on presto-hive is not helping either,
> as the lib uses airlift logger.
> Also, based on the code here
> https://github.com/prestodb/presto/blob/2aeedb944fc8b47bfe1cad78732d6dd2308ee9ad/presto-hive/src/main/java/com/facebook/presto/hive/s3/PrestoS3FileSystem.java#L821,
> PrestoS3FileSystem does switch to iam role credentials if one is provided.
>
> Anyone successful using the s3 presto filesystem in flink v1.13.0?
>
>
> Thanks,
> Vamshi
>
>
> On Mon, Aug 16, 2021 at 3:59 AM David Morávek  wrote:
>
>> Hi Vamshi,
>>
>> From your configuration I'm guessing that you're using Amazon S3 (not any
>> implementation such as Minio).
>>
>> Two comments:
>> - *s3.endpoint* should not contain bucket (this is included in your s3
>> path, eg. *s3:///*)
>> - "*s3.path.style.access*: true" is only correct for 3rd party
>> implementation such as Minio / Swift, that have bucket definied in url path
>> instead of subdomain
>>
>> You can find some information about connecting to s3 in Flink docs [1].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/filesystems/s3/
>> 
>>
>> Best,
>> D.
>>
>>
>> On Tue, Aug 10, 2021 at 2:37 AM Vamshi G 
>> wrote:
>>
>>> We are using Flink version 1.13.0 on Kubernetes.
>>> For checkpointing we have configured fs.s3 flink-s3-fs-presto.
>>> We have enabled sse on our buckets with kms cmk.
>>>
>>> flink-conf.yaml is configured as below.
>>> s3.entropy.key: _entropy_
>>> s3.entropy.length: 4
>>> s3.path.style.access: true
>>> s3.ssl.enabled: true
>>> s3.sse.enabled: true
>>> s3.sse.type: KMS
>>> s3.sse.kms-key-id: 
>>> s3.iam-role: 
>>> s3.endpoint: .s3-us-west-2.amazonaws.com
>>> 
>>> s3.credentials-provider:
>>> com.amazonaws.auth.profile.ProfileCredentialsProvider
>>>
>>> However, PUT operations on the bucket are resulting in access denied
>>> error. Access policies for the role are checked and works fine when checked
>>> with CLI.
>>> Also, can't get to see debug logs from presto s3 lib, is there a way to
>>> enable logger for presto airlift logging?
>>>
>>> Any inputs on above issue?
>>>
>>>

-- 
Regards,
Parag Surajmal Somani.


Re: Application mode - Custom Flink docker image with Python user code

2021-10-26 Thread Dian Fu
Hi Sumeet,

It still has not provided special support to handle the dependencies for
the Application mode in PyFlink. This means that the dependencies could be
handled the same as the other deployment modes. However, it is indeed
correct that the dependencies could be handled differently in application
mode. For Python user code, I guess you could add them to the environment
variable PYTHONPATH? Then it's not necessary to specify them again in the
job.

Regards,
Dian

On Tue, Oct 26, 2021 at 3:23 PM Sumeet Malhotra 
wrote:

> Hi,
>
> I'm currently submitting my Python user code from my local machine to a
> Flink cluster running in Session mode on Kubernetes. For this, I have a
> custom Flink image with Python as per this reference [1].
>
> Now, I'd like to move to using the Application mode with Native
> Kubernetes, where the user code is embedded within the container image. For
> Java, the process is described here [2]. This basically requires the user
> application jar to be copied over to $FLINK_HOME/usrlib directory. I
> couldn't find any documentation on where should Python user code live? Can
> it be packaged at any location and invoked?
>
> Thanks,
> Sumeet
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/docker/#enabling-python
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#application-mode
>


Re: Application mode - Custom Flink docker image with Python user code

2021-10-26 Thread Shuiqiang Chen
Hi Sumeet,

Actually, running pyflink jobs in application mode on kubernetes has been
supported since release 1.13.

To build a docker image with PyFlink installed, please refer to Enabling
Python[1]. In order to run the python code in application mode, you also
need to COPY the code files into the container image when building it. Then
you can submit the job by executing the following command:

$ ./bin/flink run-application \
--target kubernetes-application
-Dkubernetes.cluster-id={your-custom-cluster-id} \
-Dkubernetes.container.image = {your-custom-image-name} \
-pyfs {the python file path with the container image} \
-pym {the-entry-module-name} \
[the-entry-main-args](optional)

Hope this helps!

Best,
Shuiqiang



[1]
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/docker/#enabling-python

Sumeet Malhotra  于2021年10月26日周二 下午3:23写道:

> Hi,
>
> I'm currently submitting my Python user code from my local machine to a
> Flink cluster running in Session mode on Kubernetes. For this, I have a
> custom Flink image with Python as per this reference [1].
>
> Now, I'd like to move to using the Application mode with Native
> Kubernetes, where the user code is embedded within the container image. For
> Java, the process is described here [2]. This basically requires the user
> application jar to be copied over to $FLINK_HOME/usrlib directory. I
> couldn't find any documentation on where should Python user code live? Can
> it be packaged at any location and invoked?
>
> Thanks,
> Sumeet
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/docker/#enabling-python
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#application-mode
>


Re: FlinkKafkaProducer deprecated in 1.14 but pyflink binding still present?

2021-10-26 Thread Dian Fu
Hi Francis,

Yes, you are right. It's still not updated in PyFlink as
KafkaSource/KafkaSink are still not supported in PyFlink. Hopeful we could
add that support in 1.15 and then we could deprecate/remove the legacy
interfaces.

Regards,
Dian

On Tue, Oct 26, 2021 at 12:53 PM Francis Conroy <
francis.con...@switchdin.com> wrote:

> Looks like this got deprecated in 1.14 in favour of KafkaSink/KafkaSource
> but the python binding didn't get updated? Can someone confirm this?
>
> Francis Conroy
>
> This email and any attachments are proprietary and confidential and are
> intended solely for the use of the individual to whom it is addressed. Any
> views or opinions expressed are solely those of the author and do not
> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
> received this email in error, please let us know immediately by reply email
> and delete it from your system. You may not use, disseminate, distribute or
> copy this message nor disclose its contents to anyone.
> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
> Australia
>


Re: flink-yarn的pre-job模式

2021-10-26 Thread Shuiqiang Chen
你好,

上传的图片无法加载。 这种情况是 yarn 无法提供拉起taskmanager,检查下yarn资源是否充足?

王健 <13166339...@163.com> 于2021年10月26日周二 下午7:50写道:

> 您好:
>   我部署flink yarn的pre-job模式运行报错,麻烦看看是啥原因,非常感谢。
>
>  1.运行命令:/usr/local/flink-1.13.2/bin/flink run -t yarn-per-job -c
> com.worktrans.flink.wj.ods.FlinkCDC01 /usr/local/flink-1.13.2/flink_x.jar
>  提交正常,如图:
>
>  2.yarn 截图
>
>
> 3.flink截图:
>   现象:taskslot和taskmanager数量都为0,一直在申请
>
>
>  4.最后结果:报错如下
> 2021-10-25 16:17:49
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Slot request bulk is not fulfillable! Could not allocate the required slot
> within slot request timeout
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.scheduler.SharedSlot.cancelLogicalSlotRequest(SharedSlot.java:222)
> at
> org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.cancelLogicalSlotRequest(SlotSharingExecutionSlotAllocator.java:164)
> at
> org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk.cancel(SharingPhysicalSlotRequestBulk.java:86)
> at
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp.cancel(PhysicalSlotRequestBulkWithTimestamp.java:66)
> at
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:91)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Slot request bulk is not fulfillable! Could not allocate the required slot
> within slot request timeout
> at
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)
> ... 26 more
> Caused by: java.util.concurrent.TimeoutException: Timeout has occurred:
> 30 ms
> ... 27 more
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>


Re: How to refresh topics to ingest with KafkaSource?

2021-10-26 Thread Mason Chen
Hi all,

I have a similar requirement to Preston. I created 
https://issues.apache.org/jira/browse/FLINK-24660 
 to track this effort.

Best,
Mason

> On Oct 18, 2021, at 1:59 AM, Arvid Heise  wrote:
> 
> Hi Preston,
> 
> if you still need to set KafkaSubscriber explicitly, could you please create 
> a feature request for that? For now, you probably have to resort to 
> reflection hacks and build against a the non-public KafkaSubscriber.
> 
> On Fri, Oct 15, 2021 at 4:03 AM Prasanna kumar  > wrote:
> Yes you are right.
> 
> We tested recently to find that the flink jobs do not pick up the new topics 
> that got created with the same pattern provided to flink kafka consumer.  The 
> topics are set only during the start of the jobs. 
> 
> Prasanna.
> 
> On Fri, 15 Oct 2021, 05:44 Preston Price,  > wrote:
> Okay so topic discovery is possible with topic patterns, and maybe topic 
> lists. However I don't believe it's possible to change the configured topic 
> list, or topic pattern after the source is created.
> 
> On Thu, Oct 14, 2021, 3:52 PM Denis Nutiu  > wrote:
> There is a setting for dynamic topic discovery 
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/kafka/#topic-and-partition-discovery
>  
> 
> Best,
> 
> Denis
> 
> 
> On Fri, Oct 15, 2021 at 12:48 AM Denis Nutiu  > wrote:
> Hi,
> 
> In my experience with the librdkafka client and the Go wrapper, the 
> topic-pattern subscribe is reactive. The Flink Kafka connector might behave 
> similarly. 
> 
> Best,
> Denis
> 
> On Fri, Oct 15, 2021 at 12:34 AM Preston Price  > wrote:
> No, the topic-pattern won't work for my case. Topics that I should subscribe 
> to can be enabled/disabled based on settings I read from another system, so 
> there's no way to craft a single regular expression that would fit the state 
> of all potential topics. Additionally the documentation you linked seems to 
> suggest that the regular expression is evaluated only once "when the job 
> starts running". My understanding is it would not pick up new topics that 
> match the pattern after the job starts.
> 
> 
> On Wed, Oct 13, 2021 at 8:51 PM Caizhi Weng  > wrote:
> Hi!
> 
> I suppose you want to read from different topics every now and then? Does the 
> topic-pattern option [1] in Table API Kafka connector meet your needs?
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/docs/connectors/table/kafka/#topic-pattern
>  
> 
> Preston Price mailto:nacro...@gmail.com>> 于2021年10月14日周四 
> 上午1:34写道:
> The KafkaSource, and KafkaSourceBuilder appear to prevent users from 
> providing their own KafkaSubscriber. Am I overlooking something?
> 
> In my case I have an external system that controls which topics we should be 
> ingesting, and it can change over time. I need to add, and remove topics as 
> we refresh configuration from this external system without having to stop and 
> start our Flink job. Initially it appeared I could accomplish this by 
> providing my own implementation of the `KafkaSubscriber` interface, which 
> would be invoked periodically as configured by the 
> `partition.discovery.interval.ms ` 
> property. However there is no way to provide my implementation to the 
> KafkaSource since the constructor for KafkaSource is package protected, and 
> the KafkaSourceBuilder does not supply a way to provide the `KafkaSubscriber`.
> 
> How can I accomplish a period refresh of the topics to ingest?
> 
> Thanks
> 
> 
> 
> 
> -- 
> Regards,
> Denis Nutiu
> 
> 
> -- 
> Regards,
> Denis Nutiu



FlinkKafkaConsumer -> KafkaSource State Migration

2021-10-26 Thread Mason Chen
Hi all,

I read these instructions for migrating to the KafkaSource:
https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
.

Do we need to employ any uid/allowNonRestoredState tricks if our Flink job
is also stateful outside of the source? Or what is the mechanism that
resolves the topic/partition/offsets in the stateful upgrade? Will
restoring from FlinkKafkaConsumer cause an exception due to incompatibility
of the union state to the current (what is it again)?

Best,
Mason


Re: s3 access denied with flink-s3-fs-presto

2021-10-26 Thread Vamshi G
s3a with hadoop s3 filesystem works fine for us wit sts assume role
credentials and with kms.
Below are how our hadoop s3a configs look like. Since the endpoint is
globally whitelisted, we don't explicitly mention the endpoint.

fs.s3a.aws.credentials.provider:
org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider
fs.s3a.assumed.role.credentials.provider:
com.amazonaws.auth.profile.ProfileCredentialsProvider
fs.s3a.assumed.role.arn: arn:aws:iam:::role/
fs.s3a.server-side-encryption-algorithm: SSE-KMS
fs.s3a.server-side-encryption.key:
arn:aws:kms:::key/


However, for checkpointing we definitely want to use presto s3, and just
could not make it work. FINE logging on presto-hive is not helping either,
as the lib uses airlift logger.
Also, based on the code here
https://github.com/prestodb/presto/blob/2aeedb944fc8b47bfe1cad78732d6dd2308ee9ad/presto-hive/src/main/java/com/facebook/presto/hive/s3/PrestoS3FileSystem.java#L821,
PrestoS3FileSystem does switch to iam role credentials if one is provided.

Anyone successful using the s3 presto filesystem in flink v1.13.0?


Thanks,
Vamshi


On Mon, Aug 16, 2021 at 3:59 AM David Morávek  wrote:

> Hi Vamshi,
>
> From your configuration I'm guessing that you're using Amazon S3 (not any
> implementation such as Minio).
>
> Two comments:
> - *s3.endpoint* should not contain bucket (this is included in your s3
> path, eg. *s3:///*)
> - "*s3.path.style.access*: true" is only correct for 3rd party
> implementation such as Minio / Swift, that have bucket definied in url path
> instead of subdomain
>
> You can find some information about connecting to s3 in Flink docs [1].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/filesystems/s3/
> 
>
> Best,
> D.
>
>
> On Tue, Aug 10, 2021 at 2:37 AM Vamshi G  wrote:
>
>> We are using Flink version 1.13.0 on Kubernetes.
>> For checkpointing we have configured fs.s3 flink-s3-fs-presto.
>> We have enabled sse on our buckets with kms cmk.
>>
>> flink-conf.yaml is configured as below.
>> s3.entropy.key: _entropy_
>> s3.entropy.length: 4
>> s3.path.style.access: true
>> s3.ssl.enabled: true
>> s3.sse.enabled: true
>> s3.sse.type: KMS
>> s3.sse.kms-key-id: 
>> s3.iam-role: 
>> s3.endpoint: .s3-us-west-2.amazonaws.com
>> 
>> s3.credentials-provider:
>> com.amazonaws.auth.profile.ProfileCredentialsProvider
>>
>> However, PUT operations on the bucket are resulting in access denied
>> error. Access policies for the role are checked and works fine when checked
>> with CLI.
>> Also, can't get to see debug logs from presto s3 lib, is there a way to
>> enable logger for presto airlift logging?
>>
>> Any inputs on above issue?
>>
>>


Re: Flink handle both kafka source and db source

2021-10-26 Thread Rafi Aroch
Hi,

Take a look at the new 1.14 feature called Hybrid Source:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/hybridsource/

Rafi


On Tue, Oct 26, 2021 at 7:46 PM Qihua Yang  wrote:

> Hi,
>
> My flink app has two data sources. One is from a Kafka topic, one is from
> a database by using the JDBC connector. Flink scan the full database table.
> Which mode should we use? batch mode or streaming mode?
> How do we know the database table is fully scanned? Will Flink throw any
> signal to show it is done?
>
>


Re: SplitEnumeratorContext callAsync() cleanup

2021-10-26 Thread Mason Chen
Hi Fabian,

Unfortunately, I don't have the log since I was just testing it out on my
local setup. I can try to reproduce it later in the week.

Best,
Mason

On Mon, Oct 25, 2021 at 8:09 AM Fabian Paul 
wrote:

> Hi Mason,
>
> Thanks for opening the ticket. Can you also share the log with us when the
> KafkaEnumerator closed before the async call finished?
>
> Best,
> Fabian


Re: Flink support for Kafka versions

2021-10-26 Thread Prasanna kumar
Hi ,

We are using Kafka broker version 2.4.1.1.
Also kafka client 2.4.1.1 jar which is part of flink kafka connector
recently was marked with high security issue.
So we excluded the dependency and overriden it with kafka client 2.8.1
client jar and it works fine with the 2.4.1.1 broker. ( since its backward
compatible)

1) If we tried to connect to broker with 2.8.1 version (with kafka client
2.8.1 jar override ) would it work or would it throw errors because of
Scala 2.11?
2) https://issues.apache.org/jira/browse/FLINK-20845 is marked as done but
the fix version is marked as 1.15.0.. Wouldn't this change be available for
1.12.x ?
3) https://issues.apache.org/jira/browse/FLINK-14105 is marked as done but
the fix version is marked as 1.14.0.. Wouldn't this change be available for
1.12.x ?

Thanks,
Prasanna.

On Wed, Apr 21, 2021 at 11:03 PM Arvid Heise  wrote:

> I'm wondering if we could shade scala 1.13 dependencies inside the Kafka
> connector? Then we would be independent of the rather big FLINK-20845.
>
> On Tue, Apr 20, 2021 at 5:54 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hi Prasanna,
>>
>> It looks like the Kafka 2.5.0 connector upgrade is tied to dropping
>> support for Scala 2.11. The best place to track that would be the ticket
>> for Scala 2.13 support, FLINK-13414 [1], and its subtask FLINK-20845 [2].
>>
>> I have listed FLINK-20845 as a blocker for FLINK-19168 for better
>> visibility.
>>
>> Best,
>> Austin
>>
>> [1]: https://issues.apache.org/jira/browse/FLINK-13414
>> [2]: https://issues.apache.org/jira/browse/FLINK-20845
>>
>> On Tue, Apr 20, 2021 at 9:08 AM Prasanna kumar <
>> prasannakumarram...@gmail.com> wrote:
>>
>>> Hi Flinksters,
>>>
>>> We are researching about if we could use the latest version of kafka
>>> (2.6.1 or 2.7.0)
>>>
>>> Since we are using Flink as a processor , we came across this
>>> https://issues.apache.org/jira/browse/FLINK-19168.
>>>
>>> It says that it does not support version 2.5.0 and beyond.
>>>
>>> That was created 8 months back , just checking if there is any effort on
>>> that front.
>>>
>>> Thanks,
>>> Prasanna
>>>
>>


Re: Flink JDBC connect with secret

2021-10-26 Thread Qihua Yang
Hi Jing,

Thank you for your suggestion. I will check if SSL parameters in URL works.

Thanks,
Qihua


On Sat, Oct 23, 2021 at 8:37 PM JING ZHANG  wrote:

> Hi Qihua,
> I checked user documents of several database vendors(postgres, oracle,
> solidDB,SQL server)[1][2][3][4][5], and studied how to use JDBC Driver with
> SSL to connect to these databases.
> Most of database vendors supports two ways:
> 1. Option1: Use Connection url
> 2. Option2:  Define in Properties when call `DriverManager.getConnection`
>
> Url is exposed to users in JDBC SQL connector currently, while properties
> parameters are not exposed yet.
> Would you please check whether defining SSL parameters in url could work
> first? If not, we would looking for other solution.
>
> [1] https://jdbc.postgresql.org/documentation/head/connect.html
> [2]
> https://www.oracle.com/technetwork/topics/wp-oracle-jdbc-thin-ssl-130128.pdf
> [3]
> https://support.unicomsi.com/manuals/soliddb/100/index.html#page/Administrator_Guide/6_Managing_network.07.13.html
> [4]
> https://docs.microsoft.com/en-us/sql/connect/jdbc/connecting-with-ssl-encryption?view=sql-server-ver15
> [5]
> https://www.ibm.com/docs/ar/informix-servers/14.10?topic=options-connecting-jdbc-applications-ssl
>
> Best,
> JING ZHANG
>
>
> Qihua Yang  于2021年10月23日周六 下午1:11写道:
>
>> Hi,
>>
>> We plan to use JDBC SQL connector to read/write database. I saw JDBC
>> connector use username and password. Is it possible to use secret(*.crt) to
>> access database. I didn't find guideline how to use it. How to config jdbc
>> with secret?
>>
>> val jdbc: JdbcConnectionOptions = 
>> JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
>> .withUrl(url)
>> .withDriverName("org.postgresql.Driver")
>> .withUsername(userName)
>> .withPassword(password)
>> .build()
>>
>> Thanks,
>> Qihua
>>
>


RE: Huge backpressure when using AggregateFunction with Session Window

2021-10-26 Thread Schwalbe Matthias
Hi Ori, … answering from remote …


  *   If not completely mistaken, Scala Vector is immutable, creating a copy 
whenever you append, but
  *   This is not the main problem, the vectors collected so far get 
deserialized with every incoming event (from state storage) and afterward 
serialized into stat storage
  *   This won’t matter so much if you only collect 2 or 3 events into a 
session window, but with maybe 1000 such events it does (you didn’t share your 
numbers  )
  *   For the ProcessFunction implementation you could use a Vector Builder and 
the assign the result.
  *   Regarding the "without touching the previously stored event" question, 
more detailed (I was in a rush)
 *   Windowing with ProcessFunction collects every event assigned to a 
session window into a list state … iterating/aggregating over the collected 
event only once when the window is triggered (i.e. the session is finished)
 *   While collecting the events into the list state it add()-s the new 
event to the list state
 *   For rocksdb this involves only serializing the single added event and 
appending the binary representation to the list state of the respective (key, 
session window key (namespace in Flink speak)), i.e.
 *   The previously stored events for the session window are not touched 
when a new event is added
  *   Next question: the overhead can easily be the cause of such backpressure, 
depending on the numbers:
 *   Serialized size of your accumulator, proportional to the number of 
aggregated events
 *   Size and entropy, frquency of your key space -> cache hits vs. cache 
fails in RocksDb
  *   Of course there could be additional sources of backpressure

I hope this helps, … I’ll be back next week

Thias

From: Ori Popowski 
Sent: Donnerstag, 21. Oktober 2021 15:32
To: Schwalbe Matthias 
Cc: user 
Subject: Re: Huge backpressure when using AggregateFunction with Session Window


Thanks for taking the time to answer this.

  *   You're correct that the SimpleAggregator is not used in the job setup. I 
didn't copy the correct piece of code.
  *   I understand the overhead involved. But I do not agree with the O(n^2) 
complexity. Are you implying that Vector append is O(n) by itself?
  *   I understand your points regarding ProcessFunction except for the 
"without touching the previously stored event". Also with AggregateFunction + 
concatenation I don't touch the elements other than the new element. I forgot 
to mention by the way, that the issue reproduces also with Lists which should 
be much faster for appends and concats.
Could overhead by itself account for the backpressure?
From this job the only conclusion is that Flink just cannot do aggregating 
operations which collect values, only simple operations which produce a scalar 
values (like sum/avg). It seems weird to me Flink would be so limited in such 
way.



On Wed, Oct 20, 2021 at 7:03 PM Schwalbe Matthias 
mailto:matthias.schwa...@viseca.ch>> wrote:
Hi Ori,

Just a couple of comments (some code is missing for a concise explanation):

  *   SimpleAggregator is not used in the job setup below (assuming another job 
setup)
  *   SimpleAggregator is called for each event that goes into a specific 
session window, however

 *   The scala vectors will ever grow with the number of events that end up 
in a single window, hence
 *   Your BigO complexity will be O(n^2), n: number of events in window (or 
worse)
 *   For each event the accumulator is retrieved from window state and 
stored to window state (and serialized, if on RocksDB Backend)

  *   On the other hand when you use a process function

 *   Flink keeps a list state of events belonging to the session window, and
 *   Only when the window is triggered (on session gap timeout) all events 
are retrieved from window state and processed
 *   On RocksDbBackend the new events added to the window are appended to 
the existing window state key without touching the previously stored events, 
hence
 *   Serialization is only done once per incoming event, and
 *   BigO complexity is around O(n)

… much simplified

When I started with similar questions I spent quite some time in the debugger, 
breaking into the windowing functions and going up the call stack, in order to 
understand how Flink works … time well spent


I hope this helps …

I won’t be able to follow up for the next 1 ½ weeks, unless you try to meet me 
on FlinkForward conference …

Thias

From: Ori Popowski mailto:ori@gmail.com>>
Sent: Mittwoch, 20. Oktober 2021 16:17
To: user mailto:user@flink.apache.org>>
Subject: Huge backpressure when using AggregateFunction with Session Window

I have a simple Flink application with a simple keyBy, a SessionWindow, and I 
use an AggregateFunction to incrementally aggregate a result, and write to a 
Sink.

Some of the requirements involve accumulating lists of fields from the events 
(for example, all URLs), so not all the values in the end should 

Re: Troubleshooting checkpoint timeout

2021-10-26 Thread Piotr Nowojski
I'm glad that I could help :)

Piotrek

pon., 25 paź 2021 o 16:04 Alexis Sarda-Espinosa <
alexis.sarda-espin...@microfocus.com> napisał(a):

> Oh, I got it. I should’ve made the connection earlier after you said “Once
> an operator decides to send/broadcast a checkpoint barrier downstream, it
> just broadcasts it to all output channels”.
>
>
>
> I’ll see what I can do about upgrading the Flink version and do some more
> tests with unaligned checkpoints. Thanks again for all the info.
>
>
>
> Regards,
>
> Alexis.
>
>
>
> *From:* Piotr Nowojski 
> *Sent:* Montag, 25. Oktober 2021 15:51
> *To:* Alexis Sarda-Espinosa 
> *Cc:* Parag Somani ; Caizhi Weng <
> tsreape...@gmail.com>; Flink ML 
> *Subject:* Re: Troubleshooting checkpoint timeout
>
>
>
> Hi Alexis,
>
>
>
> >  Should I understand these metrics as a property of an operator and not
> of each subtask (at least for aligned checkpoints)? Then “first” and “last”
> would make sense to me: first/last across all subtasks/channels for a given
> operator.
>
>
>
> Those are properties of a subtask. Subtasks are a collection of chained
> parallel instances of operators. If you have a simple job like
> `source.keyBy(...).window(...).process(...)`, with parallelism of 10, you
> will have two tasks. Each task will have 10 subtasks. Each subtask will
> have only a single element operator chain, with a single operator (either
> source operator for the source task/subtasks, or window/process function
> for the second task). If you add a sink to your job
> `source.keyBy(...).window(...).process(...).addSink(...)`, this sink will
> be chained with the window/process operator. You will still end up with two
> tasks:
>
>
>
> 1. Source
> 2. Window -> Sink
>
>
>
> again, each will have 10 subtasks, with parallel instances of the
> respective operators.
>
>
>
> So if you look at the "alignment duration" of a subtask from "2. Window ->
> Sink" task, that will be the difference between receiving a first
> checkpoint barrier from any of the "1. Source" subtasks and the last
> checkpoint barrier from those "1. Source" subtasks.
>
>
>
> > Naturally, for unaligned checkpoints, alignment duration isn’t
> applicable, but what about Start Delay? I imagine that might indeed be a
> property of the subtask and not the operator.
>
> As per the docs that I've already linked [1]
>
>
> Alignment Duration: The time between processing the first and the last
> checkpoint barrier. For aligned checkpoints, during the alignment, the
> channels that have already received checkpoint barriers are blocked from
> processing more data.
>
>
>
> This number is also defined the same way for the unaligned checkpoints.
> Even with unaligned checkpoints a subtask needs to wait for receiving all
> of the checkpoint barriers before completing the checkpoint. However, as
> subtask can broadcast the checkpoint barrier downstream immediately upon
> receiving the first checkpoint barrier AND those checkpoint barriers are
> able to overtake in-flight data, the propagation happens very very quickly
> for the most part. Hence alignment duration and start delay in this case
> should be very small, unless you have deeper problems like long GC pauses.
>
> > If I’m understanding the aligned checkpoint mechanism correctly, after
> the first failure the job restarts and tries to read, let’s say, the last 5
> minutes of data. Then it fails again because the checkpoint times out and,
> after restarting, would it try to read, for example, 15 minutes of data? If
> there was no backpressure in the source, it could be that the new
> checkpoint barriers created after the first restart are behind more data
> than before it restarted, no?
>
>
>
> I'm not sure if I understand. But yes. It's a valid scenario that:
>
> 1. timestamp t1, checkpoint 42 completes
> 2. failure happens at timestamp t1 + 10 minutes.
> 3. timestamp t2, job is recovered to checkpoint 42.
>
> 4. timestamp t2 + 5 minutes, checkpoint 43 is triggered.
>
>
>
> Between 1. and 2., your job could have processed more records than between
> 3. and 4.
>
>
>
> Piotrek
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/monitoring/checkpoint_monitoring/
>
>
>
> pon., 25 paź 2021 o 15:02 Alexis Sarda-Espinosa <
> alexis.sarda-espin...@microfocus.com> napisał(a):
>
> Hi again,
>
>
>
> Thanks a lot for taking the time to clarify this. I think that the main
> thing that is confusing me is that the UI shows Alignment Duration and
> other checkpoint metrics for each subtask, and the resources you’ve sent
> always discuss a single barrier per subtask channel. Should I understand
> these metrics as a property of an operator and not of each subtask (at
> least for aligned checkpoints)? Then “first” and “last” would make sense to
> me: first/last across all subtasks/channels for a given operator.
>
>
>
> Naturally, for unaligned checkpoints, alignment duration isn’t applicable,
> but what about Start Delay? I imagine that might indeed be a property of
> the 

flink-yarn的pre-job模式

2021-10-26 Thread 王健
您好:
  我部署flink yarn的pre-job模式运行报错,麻烦看看是啥原因,非常感谢。


 1.运行命令:/usr/local/flink-1.13.2/bin/flink run -t yarn-per-job -c 
com.worktrans.flink.wj.ods.FlinkCDC01 /usr/local/flink-1.13.2/flink_x.jar
 提交正常,如图:

 2.yarn 截图
  


3.flink截图:
  现象:taskslot和taskmanager数量都为0,一直在申请



 4.最后结果:报错如下
2021-10-25 16:17:49
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Slot request bulk is not fulfillable! Could not allocate the required slot 
within slot request timeout
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.scheduler.SharedSlot.cancelLogicalSlotRequest(SharedSlot.java:222)
at 
org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.cancelLogicalSlotRequest(SlotSharingExecutionSlotAllocator.java:164)
at 
org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk.cancel(SharingPhysicalSlotRequestBulk.java:86)
at 
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp.cancel(PhysicalSlotRequestBulkWithTimestamp.java:66)
at 
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:91)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:517)
at akka.actor.Actor.aroundReceive$(Actor.scala:515)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Slot request bulk is not fulfillable! Could not allocate the required slot 
within slot request timeout
at 
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:86)
... 26 more
Caused by: java.util.concurrent.TimeoutException: Timeout has occurred: 30 
ms
... 27 more

























 

Re: Re: Flink任务每运行20天均会发生内部异常

2021-10-26 Thread mayifan
非常感谢大佬的答复:

目前从任务来看的话总共存在三个任务,其中两个异常任务分别使用了1到2个MapState,过期时间均为1天或3天。

正常运行的任务使用了MapState及ListState各4个,过期时间为60min-120min。

异常任务在产生异常后从checkpoint重启又会恢复正常。


> -- 原始邮件 --
> 发 件 人:"Caizhi Weng" 
> 发送时间:2021-10-26 18:45:44
> 收 件 人:"flink中文邮件组" 
> 抄 送:
> 主 题:Re: Flink任务每运行20天均会发生内部异常
>
> Hi!
>
> 听起来和 state 过期时间非常有关。你配置了哪些和 state 过期相关的参数?是否有 20 天过期的 state?
>
> mayifan 于2021年10月26日周二 下午4:43写道:
>
> > Hi!
> >
> > 麻烦请教大家一个问题。
> >
> >
> > 有三个Flink任务以yarn-per-job模式运行在Flink-1.11.2版本的集群上,均使用RocksDB作为状态后端,数据以增量的方式写入RocksDB,且均配置了状态过期时间。
> >
> >
> > 任务逻辑大致都是通过状态与历史数据进行自关联或双流join,每输入一条数据都会产出等量、1/2或多倍的数据到下游,当数据无法通过状态关联,任务则无法向下游产出数据。
> >
> >
> > 奇怪的是三个任务中有两个任务存在异常,异常现象是每次当任务启动运行至第20个工作日,都会非常准时的产生下游数据输出骤降的现象,输出与输入的数据量级差数十倍,并且此时任务中没有任何异常日志。
> >
> >
> >
> >
> > 问题:目前怀疑是集群配置或RocksDB状态的问题,但是没有任何思路或排查线索,请问这种现象是怎样产生的?应该怎样排查?






RE: Using POJOs with the table API

2021-10-26 Thread Alexis Sarda-Espinosa
Hello,

I've found a ticket that talks about very high-level improvements to the Table 
API [1]. Are there any more concrete pointers for migration from DataSet to 
Table API? Will it be possible at all to use POJOs with the Table API?

[1] https://issues.apache.org/jira/browse/FLINK-20787

Regards,
Alexis.

From: Alexis Sarda-Espinosa 
Sent: Donnerstag, 5. August 2021 15:49
To: user@flink.apache.org
Subject: Using POJOs with the table API

Hi everyone,

I had been using the DataSet API until now, but since that's been deprecated, I 
started looking into the Table API. In my DataSet job I have a lot of POJOs, 
all of which are even annotated with @TypeInfo and provide the corresponding 
factories. The Table API documentation talks about POJOs here: 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/types/#user-defined-data-types

I started with a single UDF to try it out (an AggregateFunction), but I have 
encountered several issues.

My accumulator class is a (serializable) POJO, but initially there were 
validation exceptions because the specified class is abstract. I added this to 
the class and got over that:

@FunctionHint(
accumulator = DataTypeHint("RAW", bridgedTo = MyAccumulator.class)
)

Then there were exceptions about the output type. Since it's also a POJO, I 
thought this would help:

@FunctionHint(
accumulator = DataTypeHint("RAW", bridgedTo = MyAccumulator::class),
output = DataTypeHint("RAW", bridgedTo =  MyDTO.class)
)

But no luck: org.apache.flink.table.api.TableException: Unsupported conversion 
from data type 'RAW('com.MyDTO', '...')' (conversion class: com.MyDTO) to type 
information. Only data types that originated from type information fully 
support a reverse conversion.

I figured I would try something simpler and first return a List from my 
AggregateFunction. But how do I define that in a DataTypeHint? I'm not sure if 
that's documented, but I looked through LogicalTypeParser and used:

output = DataTypeHint("ARRAY")

But that throws an exception (see attachment): Table program cannot be 
compiled. This is a bug. Please file an issue.

I changed the List to String[] and that finally worked.

Even getting a simple test running was difficult. I simply could not get 
TableEnvironment#fromValues to work with POJOs as input, and I tried many 
combinations of DataTypes.of(MyPojo.class)

At this point I still don't know how to return complex data structures 
encapsulated in POJOs from my UDF. Am I missing something very essential?

Regards,
Alexis.



??????flink keyby??????????????????

2021-10-26 Thread yuankuo.xia


filter??filter??




----
??: 
   "user-zh"



Re: flink写mysql问题

2021-10-26 Thread Caizhi Weng
Hi!

Flink 1.11 对 jdbc 在流作业中的支持确实不完善,在流作业做 checkpoint 时没有处理。如果需要在流作业中使用 jdbc
sink,建议升级到比较新的 1.13 或 1.14。

zya  于2021年10月26日周二 下午4:56写道:

> 你好,感谢回复
> 在任务做检查点的时候,内存中缓存的一批数据如何 flush 到 mysql 中的呢?
>
>
> 我用的是1.11.2版本的flink
> sql,我发现数据写到外部直接使用的是BufferReduceStatementExecutor中的方法,同时在做检查点的时候不会触发到数据库的flush,好像没有使用到类GenericJdbcSinkFunction
> 那么如果遇到断电等问题,这部分数据是不是会丢失呢
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> tsreape...@gmail.com;
> 发送时间:2021年10月26日(星期二) 上午10:31
> 收件人:"flink中文邮件组"
> 主题:Re: flink写mysql问题
>
>
>
> Hi!
>
> 在任务做检查点的时候,内存中缓存的一批数据如何 flush 到 mysql 中的呢
>
>
> JdbcDynamicTableSink 不包含具体 sink function
> 的实现,具体的实现位于 GenericJdbcSinkFunction。该类的 snapshotState 即为 snapshot 的实现。不同的
> jdbc 数据库以及不同的 sql 之间攒 batch 的行为略有不同,具体见 JdbcBatchStatementExecutor 及其子类。
>
> 写 mysql 的 qps 只能到几百,反压严重
>
>
> jdbc connector 有一些 with 参数用来控制 flush 的时间。例如 sink.buffer-flush.interval
> 就会控制攒了多少数据就 flush。它的默认值是 100,因此对于流量比较大的作业需要相应调大。其他相关参数见 [1]。
>
> 算子使用 sum 计算,高峰时候 1000条/s,但是做检查点需要好几分钟才能完成
>
>
> checkpoint 阻塞有大量原因。从邮件中的描述来看最有可能是因为 sink 反压导致上游 checkpoint
> 也被反压。排除该原因后还可以观察 checkpoint 大小是否过大,以及相应节点 gc 时间是否过长。这个要结合具体的 sql 分析。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/connectors/table/jdbc/#sink-buffer-flush-max-rows
>
> a <806040...@qq.com.invalid 于2021年10月26日周二 上午9:49写道:
>
> 
> 
> 各位好,我在使用flink写mysql的时候,发现sink是使用了JdbcDynamicTableSink这个类,但是这个类没有实现checkpoint相关的接口,我想请问一下,1.在任务做检查点的时候,内存中缓存的一批数据如何flush到mysql中的呢
> 
> 
> 2.我的任务写mysql的qps只能到几百,反压严重,算子使用sum计算,高峰时候1000条/s,但是做检查点需要好几分钟才能完成,请问这里有什么排查方法吗


Re: Flink任务每运行20天均会发生内部异常

2021-10-26 Thread Caizhi Weng
Hi!

听起来和 state 过期时间非常有关。你配置了哪些和 state 过期相关的参数?是否有 20 天过期的 state?

mayifan  于2021年10月26日周二 下午4:43写道:

> Hi!
>
> 麻烦请教大家一个问题。
>
>
> 有三个Flink任务以yarn-per-job模式运行在Flink-1.11.2版本的集群上,均使用RocksDB作为状态后端,数据以增量的方式写入RocksDB,且均配置了状态过期时间。
>
>
> 任务逻辑大致都是通过状态与历史数据进行自关联或双流join,每输入一条数据都会产出等量、1/2或多倍的数据到下游,当数据无法通过状态关联,任务则无法向下游产出数据。
>
>
> 奇怪的是三个任务中有两个任务存在异常,异常现象是每次当任务启动运行至第20个工作日,都会非常准时的产生下游数据输出骤降的现象,输出与输入的数据量级差数十倍,并且此时任务中没有任何异常日志。
>
>
>
>
> 问题:目前怀疑是集群配置或RocksDB状态的问题,但是没有任何思路或排查线索,请问这种现象是怎样产生的?应该怎样排查?


Re: flink keyby之后数据倾斜的问题

2021-10-26 Thread Caizhi Weng
Hi!

Flink SQL 里已经内置了很多解倾斜的方式,例如 local global 聚合。详见 [1],如果一定要使用 streaming api
可以参考该思路进行优化。

[1]
https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/dev/table/tuning/#local-global-%e8%81%9a%e5%90%88

xiazhl  于2021年10月26日周二 下午2:31写道:

> hello everyone!   
>向大家求助一个使用keyby后导致数据倾斜的问题。  
>
>
>背景:使用flink streamAPI进行数据处理和提取,结果写入物理存储。
> 处理后会将数据量放大10倍左右。
>   
> 考虑到其中有大量重复数据,使用flink状态根据id进行精确去重。去重前使用keyby id对数据进行分区。
>
>
>问题:目前keyby之后会产生数据倾斜,切斜比例 高:低≈3:1,
> 各位大佬有什么好的方案处理这个问题吗?


Re: Not cleanup Kubernetes Configmaps after execution success

2021-10-26 Thread Roman Khachatryan
Thanks for sharing this,
The sequence of events the log seems strange to me:

2021-10-17 03:05:55,801 INFO
org.apache.flink.runtime.jobmaster.JobMaster [] -
Close ResourceManager connection c1092812cfb2853a5576ffd78e346189:
Stopping JobMaster for job 'rt-match_12.4.5_8d48b21a'
().
2021-10-17 03:05:59,382 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
Starting KubernetesApplicationClusterEntrypoint (Version: 1.14.0,
Scala: 2.12, Rev:460b386, Date:2021-09-22T08:39:40+02:00)
2021-10-17 03:06:00,251 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
2021-10-17 03:06:04,355 ERROR
io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector []
- Exception occurred while acquiring lock 'ConfigMapLock: flink-ns -
match-70958037-f414-4925-9d60-19e90d12abc0-restserver-leader
(ef5c2463-2d66-4dce-a023-4b8a50d7acff)'
io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LockException:
Unable to create ConfigMapLock
Caused by: io.fabric8.kubernetes.client.KubernetesClientException:
Operation: [create]  for kind: [ConfigMap]  with name:
[match-70958037-f414-4925-9d60-19e90d12abc0-restserver-leader]  in
namespace: [flink-ns]  failed.
Caused by: java.io.InterruptedIOException

It looks like KubernetesApplicationClusterEntrypoint is re-started in
the middle of shutdown and, as a result, the resources it (re)creates
aren't clean up.

Could you please also share Kubernetes logs and resource definitions
to validate the above assumption?

Regards,
Roman

On Mon, Oct 25, 2021 at 6:15 AM Hua Wei Chen  wrote:
>
> Hi all,
>
> We have Flink jobs run on batch mode and get the job status via 
> JobHandler.onJobExecuted()[1].
>
> Base on the thread[2], we expected the Configmaps will be cleaned up after 
> execution successfully.
>
> But we found some Configmaps not be cleanup after job success. On the other 
> hand, the Configmaps contents and the labels are removed.
>
> Here is one of the Configmaps.
>
> ```
> apiVersion: v1
> kind: ConfigMap
> metadata:
>   name: match-6370b6ab-de17-4c93-940e-0ce06d05a7b8-resourcemanager-leader
>   namespace: app-flink
>   selfLink: >-
> 
> /api/v1/namespaces/app-flink/configmaps/match-6370b6ab-de17-4c93-940e-0ce06d05a7b8-resourcemanager-leader
>   uid: 80c79c87-d6e2-4641-b13f-338c3d3154b0
>   resourceVersion: '578806788'
>   creationTimestamp: '2021-10-21T17:06:48Z'
>   annotations:
> control-plane.alpha.kubernetes.io/leader: >-
>   
> {"holderIdentity":"3da40a4a-0346-49e5-8d18-b04a68239bf3","leaseDuration":15.0,"acquireTime":"2021-10-21T17:06:48.092264Z","renewTime":"2021-10-21T17:06:48.092264Z","leaderTransitions":0}
>   managedFields:
> - manager: okhttp
>   operation: Update
>   apiVersion: v1
>   time: '2021-10-21T17:06:48Z'
>   fieldsType: FieldsV1
>   fieldsV1:
> 'f:metadata':
>   'f:annotations':
> .: {}
> 'f:control-plane.alpha.kubernetes.io/leader': {}
> data: {}
> ```
>
>
> Our Flink apps run on ver. 1.14.0.
> Thanks!
>
> BR,
> Oscar
>
>
> Reference:
> [1] JobListener (Flink : 1.15-SNAPSHOT API) (apache.org)
> [2] 
> https://lists.apache.org/list.html?user@flink.apache.org:lte=1M:High%20availability%20data%20clean%20up%20
>


flink keyby??????????????????

2021-10-26 Thread yuankuo.xia
hello everyone??   
   ??keyby??  



   ??flink 
streamAPI?? 10
   
??flinkidkeyby 
id


   ??keyby 
??:3:1?? ??

??????Flink????Operator????????????Metrics

2021-10-26 Thread yuankuo.xia
web-ui??metrics




----
??: 
   "user-zh"



Re: Async Performance

2021-10-26 Thread Arvid Heise
Hi Sanket,

if you have a queue of 1000, then 1000 will be used in AsyncIO. Memory
doesn't matter. What you need to double-check is if your async library can
handle that many elements in parallel.

The AsyncHttpClient should have a thread pool that effectively will put an
upper limit on how many elements are processed in parallel. You usually
want to set the size of the thread pool to the queue size. You also need to
ensure that the thread pool is not shared across clients in the same task
manager or increase the thread pool accordingly. So if all 8 instances run
on the same task managers, you need 8000 parallel threads. Since you have 6
AsyncIO, you need a total of 48k threads.

Note that I'd probably start with fewer threads and queue size first. You
may get the desired results much sooner if you keep thread pool and queue
size in sync (I'm assuming you currently have 50 threads which is the
default afaik and 20x queue size)

Another thing to look at is if you use an ordered or unordered queue.
Ordered queue will always decrease performance if the settings are not in
sync.

On Tue, Oct 26, 2021 at 7:06 AM Sanket Agrawal 
wrote:

> Hello Everyone,
>
>
>
> I am using a series of 6 Async Operators in my application. Each operator
> is using AsyncHttpClient to make some external API call. Each Async
> Operator makes only one call to external API for a message. Capacity is set
> t 1000 for each parallelism. Approximately, we are getting response in
> 1000ms with correction of 20ms. We see that, at parallelism of 8 for async,
> only 5-8 messages are inside single parallelism of an async operator. I am
> trying to figure out what decides how many messages will go inside the
> Async Operator.
>
>
>
> *Topology*: Source – Process – Async(parallelism 8) – Async(parallelism
> 8) – Async(parallelism 8) – Async(parallelism 8) – Async(parallelism 8) –
> Async(parallelism 8) – process – sink.
>
> *Resources*: 1vcore, 4GB ram, running on AWS Kinesis Analytics.
>
>
>
> *Metrics: *CPU utilization* 22%, *Memory utilization*: 50-58%.*
>
>
>
> It will be helpful if I can get some articles on how Flink decides on how
> many messages will enter into an Async Operator when the capacity is
> specified as 1000 but available memory is only 4GB.
>
>
>
> Thanks,
>
> Sanket Agrawal
>
>
>


?????? flink??mysql????

2021-10-26 Thread zya
??
?? flush ?? mysql 


1.11.2??flink 
sqlBufferReduceStatementExecutorflush??GenericJdbcSinkFunction





----
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/connectors/table/jdbc/#sink-buffer-flush-max-rows

a <806040...@qq.com.invalid ??2021??10??26?? 9:49??


 
flink??mysqlsinkJdbcDynamicTableSink??checkpoint??1.??flush??mysql??

 
2.??mysql??qps??sum??1000??/s

Flink任务每运行20天均会发生内部异常

2021-10-26 Thread mayifan
Hi!

麻烦请教大家一个问题。


有三个Flink任务以yarn-per-job模式运行在Flink-1.11.2版本的集群上,均使用RocksDB作为状态后端,数据以增量的方式写入RocksDB,且均配置了状态过期时间。


任务逻辑大致都是通过状态与历史数据进行自关联或双流join,每输入一条数据都会产出等量、1/2或多倍的数据到下游,当数据无法通过状态关联,任务则无法向下游产出数据。


奇怪的是三个任务中有两个任务存在异常,异常现象是每次当任务启动运行至第20个工作日,都会非常准时的产生下游数据输出骤降的现象,输出与输入的数据量级差数十倍,并且此时任务中没有任何异常日志。




问题:目前怀疑是集群配置或RocksDB状态的问题,但是没有任何思路或排查线索,请问这种现象是怎样产生的?应该怎样排查?

Re: Flink没有Operator级别的数据量Metrics

2021-10-26 Thread Ada Luna
Web-UI中的就是Flink原生正常的Metrics,都是Task级别

xiazhl  于2021年10月26日周二 下午2:31写道:
>
> web-ui里面有metrics
>
>
>
>
> --原始邮件--
> 发件人:  
>   "user-zh"   
>  
>  发送时间:2021年10月26日(星期二) 中午1:55
> 收件人:"user-zh"
> 主题:Flink没有Operator级别的数据量Metrics
>
>
>
> Flink只能看到Task级别的流入流出数据量,而没有Operator级别的。这个是出于性能考量吗?未来会加入一个开关,可以看到Operator级别的,方便debug吗?


Re: Fwd: How to deserialize Avro enum type in Flink SQL?

2021-10-26 Thread Peter Schrott
Hi people,

I found a workaround for that issue - which works at least for my use case.

The main idea was customizing 
"org.apache.flink.formats.avro.registry.confluent.RegistryAvroFormatFactory" 
such that the expected avro schema is not gained from the CREATE TABLE SQL 
statement, rather than passed in as parameter. This results in matching schemas 
(actual and expected) when avro is deserialized under the hood by avro lib.

This gives me the freedom to create a source table exactly matching the records 
in Kafka. In my case the CREATE TABLE SQL statement is generated from the avro 
schema. So actually doing it the other way round as it is done in current 
implementation.

Please compare: https://github.com/peterschrott/flinkDeserAvroEnum

Maybe this helps someone.

Best, Peter


On 2021/10/12 16:18:30 Dongwon Kim wrote:
> Hi community,
> 
> Can I get advice on this question?
> 
> Another user just sent me an email asking whether I found a solution or a
> workaround for this question, but I'm still stuck there.
> 
> Any suggestions?
> 
> Thanks in advance,
> 
> Dongwon
> 
> -- Forwarded message -
> From: Dongwon Kim 
> Date: Mon, Aug 9, 2021 at 7:26 PM
> Subject: How to deserialize Avro enum type in Flink SQL?
> To: user 
> 
> 
> Hi community,
> 
> I have a Kafka topic where the schema of its values is defined by the
> "MyRecord" record in the following Avro IDL and registered to the Confluent
> Schema Registry.
> 
> > @namespace("my.type.avro")
> > protocol MyProtocol {
> >   enum MyEnumType {
> > TypeVal1, TypeVal2
> >   }
> >   record MyEntry {
> > MyEnumType type;
> >   }
> >   record MyRecord {
> > array entries;
> >   }
> > }
> 
> 
> To read from the topic, I've defined the following DDL:
> 
> > CREATE TABLE my_table
> 
> (
> > `entries` ARRAY > *`type` ??? (This is the main question)*
> > >>
> > ) WITH (
> > 'connector' = 'kafka',
> > 'topic' = 'my-topic',
> > 'properties.bootstrap.servers' = '...:9092',
> > 'scan.startup.mode' = 'latest-offset',
> > 'value.format' = 'avro-confluent',
> > 'value.avro-confluent.schema-registry.url' = 'http://...:8081'
> >
> )
> 
> 
> And I run the following query :
> 
> > SELECT * FROM my_table
> 
> 
> Now I got the following messages in Flink-1.13.1 when I use *STRING* for
> the type:
> 
> > *Caused by: java.io.IOException: Failed to deserialize Avro record.*
> >   at
> > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106)
> >   at
> > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46)
> >   at
> > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
> >   at
> > org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
> >   at
> > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
> >   at
> > org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
> >   at
> > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
> >   at
> > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> >   at
> > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
> >   at
> > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
> > *Caused by: org.apache.avro.AvroTypeException: Found
> > my.type.avro.MyEnumType, expecting union*
> >   at
> > org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
> >   at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
> >   at
> > org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:275)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readArray(GenericDatumReader.java:298)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:183)
> >   at
> > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
> >   at
> > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
> >   at
> > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
> >   at
> > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
> >   at
> > 

Application mode - Custom Flink docker image with Python user code

2021-10-26 Thread Sumeet Malhotra
Hi,

I'm currently submitting my Python user code from my local machine to a
Flink cluster running in Session mode on Kubernetes. For this, I have a
custom Flink image with Python as per this reference [1].

Now, I'd like to move to using the Application mode with Native Kubernetes,
where the user code is embedded within the container image. For Java, the
process is described here [2]. This basically requires the user application
jar to be copied over to $FLINK_HOME/usrlib directory. I couldn't find any
documentation on where should Python user code live? Can it be packaged at
any location and invoked?

Thanks,
Sumeet


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/standalone/docker/#enabling-python
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#application-mode


??????Flink????Operator????????????Metrics

2021-10-26 Thread xiazhl
web-ui??metrics




----
??: 
   "user-zh"



flink keyby??????????????????

2021-10-26 Thread xiazhl
hello everyone??   
   ??keyby??  



   ??flink 
streamAPI?? 10
   
??flinkidkeyby 
id


   ??keyby 
??:3:1?? ??

Re: Re: Flink SQL 1.12 批量数据导入,如果加速性能

2021-10-26 Thread WuKong
Hi:
 就是最简单的 定义一个Source table 一个Sink table 相同的Schema , 比如 insert into tableB select 
* from tableA ; 执行启8个并行度的话, 会有个7个并行度是Finish 状态 只有一个 在串行的导入数据, 其中schema 例如:


CREATE TABLE tableA  (
columnOne STRING,
columnTwo BIGINT,
PRIMARY KEY (`columnTwo `) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://x/?useUnicode=true=UTF-8',
  'table-name' = 'table-name',
  'username' = 'xxx',
  'password' = 'xxx',
  'driver' = 'com.mysql.jdbc.Driver'
);


CREATE TABLE tableB  (
columnOne STRING,
columnTwo BIGINT,
PRIMARY KEY (`columnTwo `) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://x/?useUnicode=true=UTF-8',
  'table-name' = 'table-name',
  'username' = 'xxx',
  'password' = 'xxx',
  'driver' = 'com.mysql.jdbc.Driver'
);



---
Best,
WuKong
 
发件人: Caizhi Weng
发送时间: 2021-10-26 12:43
收件人: flink中文邮件组
主题: Re: Flink SQL 1.12 批量数据导入,如果加速性能
Hi!
 
我通过 Flink SQL 无论怎么加大并行度, 都是单并行度导入
 
 
你是如何加大并行度的?除 source 外其他节点也是单并行度吗,还是说可以成功加大?能否分享一下你的 SQL 便于解答?
 
WuKong  于2021年10月26日周二 上午11:36写道:
 
> Hi:
> 我目前有个场景, 需要通过基于Flink SQL 进行历史数据导入, 比如Source 端是一张MYSQL 表, Sink端
> 也是一张MSYQL 表, 我通过Flink SQL 无论怎么加大并行度, 都是单并行度导入,速率很慢, 请问有什么需要配置的吗? 或者其他解决方案
> 可以基于SQL进行大批量数据导入,数十亿量级。
>
>
>
> ---
> Best,
> WuKong
>