Re: Pulsar connector 2.9.1 failing job submission in standalone.

2022-02-20 Thread Ananth Gundabattula
Thanks for the response Guowei.


  *   Tried a telnet to the jobmanager host:port and I get “127.0.0.1:8086: 
nodename nor servname provided, or not known” which suggests that the network 
access is fine ?
  *   I resubmitted the word count example and it ran fine to completion.

For the pulsar script, I have also tried localhost, and the local LAN Ips as 
jobmanager host configuration in conf/flink.yaml and all of them end with the 
same result. I have also tried this with Pulsar 2.8.0 and it did have issues 
with “shared” subscription type (Get a “transactions not enabled” error in 
spite of enabling transactions in 2.8.0 broker).  When I change the 
subscription type to “Exclusive” it exhibits the same behavior as the Pulsar 
2.9.1 version. i.e. The job manager submission fails. (in both 2.8.0 pulsar and 
2.9.1 pulsar)

Regards,
Ananth

From: Guowei Ma 
Date: Monday, 21 February 2022 at 4:57 pm
To: Ananth Gundabattula 
Cc: user@flink.apache.org 
Subject: Re: Pulsar connector 2.9.1 failing job submission in standalone.
Hi, Ansanth

I don't see any error logs on the server side, so it's hard to tell what the 
specific problem is. From the current log, there are two things to try first:

1. From the client's log, it is a 5-minute timeout, so you can telnet 
127.0.0.1:8086 to see if there is a problem with the 
local network
2. From the log on the server side, there is no job submission at all. You can 
try to submit the wordcount example again when submitting the pulsar example 
fails. So as to rule out whether the session cluster is inherently problematic.

Best,
Guowei


On Mon, Feb 21, 2022 at 9:48 AM Ananth Gundabattula 
mailto:agundabatt...@darwinium.com>> wrote:
Hello All,

I have a Pyflink script that needs to read from Pulsar and process the data.

I have done the following to implement a prototype.


  1.  Since I need Pyflink way to connect to Pulsar , I checked out the code 
from master branch as advised in a different thread. (PyFlink Pulsar connector 
seems to be slated for 1.15 release)
  2.  I built the Flink source.
  3.  I am using the following location as FLINK_HOME under the source: 
flink-dist/target/flink-1.15-SNAPSHOT-bin/flink-1.15-SNAPSHOT
  4.  The python pyflink wheels have been appropriately installed in the right 
python conda environment.
  5.  I copied the flink-sql-connector-pulsar-1.15-SNAPSHOT.jar into the 
$FLINK_HOME/lib folder.
  6.  I started the standalone cluster by running bin/start-cluster.sh
  7.  I submit my test script by using bin/flink run –python …
  8.  If am launching the the word_count example in flink documentation, 
everything runs fine and it completes successfully.
  9.  However, if the script involves the Pulsar connector, the logs show that 
the Flink client codebase is not able to submit the job to the Jobamanger.
  10. It ultimately dies with a Channel Idle exception. (See this in DEBUG mode 
of the logs). I am attaching the logs for reference.

I am trying this on OSx. Please note that the classic word_count script works 
fine without any issues and I see the job submission failures on the client 
only when the pulsar source connector is in the script. I have also added the 
logs for the standalone session job manager.I am also attaching the script for 
reference.

Could you please advise what can I do to resolve the issue. (Will raise an 
JIRA-Issue if someone thinks it is a bug).

Regards,
Ananth



Re: Apache Flink - Continuously streaming data using jdbc connector

2022-02-20 Thread Guowei Ma
Hi,

You can try flink's cdc connector [1] to see if it meets your needs.

[1] https://github.com/ververica/flink-cdc-connectors

Best,
Guowei


On Mon, Feb 21, 2022 at 6:23 AM M Singh  wrote:

> Hi Folks:
>
> I am trying to monitor a jdbc source and continuously streaming data in an
> application using the jdbc connector.  However, the application stops after
> reading the data in the table.
>
> I've checked the docs (
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/)
> and it looks like there is a streaming sink but the sources are scan and
> lookup only.  I've also checked the connector settings but could not find
> any flag for continuous monitoring.
>
> Can you please let me know if there is setting in the connector or advice
> to make the jdbc connector source streaming data continuously ?
>
> Thanks for your help.
>
> Mans
>


Re: Pulsar connector 2.9.1 failing job submission in standalone.

2022-02-20 Thread Guowei Ma
Hi, Ansanth

I don't see any error logs on the server side, so it's hard to tell what
the specific problem is. From the current log, there are two things to try
first:

1. From the client's log, it is a 5-minute timeout, so you can telnet
127.0.0.1:8086 to see if there is a problem with the local network
2. From the log on the server side, there is no job submission at all. You
can try to submit the wordcount example again when submitting the pulsar
example fails. So as to rule out whether the session cluster is inherently
problematic.

Best,
Guowei


On Mon, Feb 21, 2022 at 9:48 AM Ananth Gundabattula <
agundabatt...@darwinium.com> wrote:

> Hello All,
>
>
>
> I have a Pyflink script that needs to read from Pulsar and process the
> data.
>
>
>
> I have done the following to implement a prototype.
>
>
>
>1. Since I need Pyflink way to connect to Pulsar , I checked out the
>code from master branch as advised in a different thread. (PyFlink Pulsar
>connector seems to be slated for 1.15 release)
>2. I built the Flink source.
>3. I am using the following location as FLINK_HOME under the source:
>flink-dist/target/flink-1.15-SNAPSHOT-bin/flink-1.15-SNAPSHOT
>4. The python pyflink wheels have been appropriately installed in the
>right python conda environment.
>5. I copied the flink-sql-connector-pulsar-1.15-SNAPSHOT.jar into the
>$FLINK_HOME/lib folder.
>6. I started the standalone cluster by running bin/start-cluster.sh
>7. I submit my test script by using bin/flink run –python …
>8. If am launching the the word_count example in flink documentation,
>everything runs fine and it completes successfully.
>9. However, if the script involves the Pulsar connector, the logs show
>that the Flink client codebase is not able to submit the job to the
>Jobamanger.
>10. It ultimately dies with a Channel Idle exception. (See this in
>DEBUG mode of the logs). I am attaching the logs for reference.
>
>
>
> I am trying this on OSx. Please note that the classic word_count script
> works fine without any issues and I see the job submission failures on the
> client only when the pulsar source connector is in the script. I have also
> added the logs for the standalone session job manager.I am also attaching
> the script for reference.
>
>
>
> Could you please advise what can I do to resolve the issue. (Will raise an
> JIRA-Issue if someone thinks it is a bug).
>
>
>
> Regards,
>
> Ananth
>
>
>


Re: No effect from --allowNonRestoredState or "execution.savepoint.ignore-unclaimed-state" in K8S application mode

2022-02-20 Thread Yang Wang
By design, we should support arbitrary config keys via the CLI when using
generic CLI mode.

Do you have also specified the "--fromSavepoint" along with
"--allowNonRestoredState" when submitting a Flink job via "flink
run-application"?

>From the current code base, it seems that the CLI options(e.g
--fromSavepoint, --allowNonRestoredState) have higher priority than Flink
config options.
And it will make the savepoint related config options are overwritten
wrongly. Refer to the implementation[1].

[1].
https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java#L181


Best,
Yang

Andrey Bulgakov  于2022年2月19日周六 08:30写道:

> Hi Austin,
>
> Thanks for the reply! Yeah, the docs aren't super explicit about this.
>
> But for what it's worth, I'm setting a few options unrelated to kubernetes
> this way and they all have effect:
> -Dstate.checkpoints.num-retained=100 \
>
> -Dfs.s3a.aws.credentials.provider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider
> \
> -Dio.tmp.dirs=/data/flink-local-data \
> -Dqueryable-state.enable=true \
>
> The only one i'm having problems with is
> "execution.savepoint.ignore-unclaimed-state".
>
> On Fri, Feb 18, 2022 at 3:42 PM Austin Cawley-Edwards <
> austin.caw...@gmail.com> wrote:
>
>> Hi Andrey,
>>
>> It's unclear to me from the docs[1] if the flink native-kubernetes
>> integration supports setting arbitrary config keys via the CLI. I'm cc'ing
>> Yang Wang, who has worked a lot in this area and can hopefully help us out.
>>
>> Best,
>> Austin
>>
>> [1]:
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/#configuring-flink-on-kubernetes
>>
>> On Fri, Feb 18, 2022 at 5:14 PM Andrey Bulgakov  wrote:
>>
>>> Hey all,
>>>
>>> I'm working on migrating our Flink job away from Hadoop session mode to
>>> K8S application mode.
>>> It's been going great so far but I'm hitting a wall with this seemingly
>>> simple thing.
>>>
>>> In the first phase of the migration I want to remove some operators
>>> (their state can be discarded) and focus on getting the primary pipeline
>>> running first.
>>> For that I have to start the cluster from a savepoint with the
>>> "allowNonRestoredState" parameter turned on.
>>>
>>> The problem is that I can't set it in any way that I'm aware of. I tried
>>> 4 ways separately and simultaneously:
>>>
>>> 1) Adding --allowNonRestoredState to flink run-application
>>> -t kubernetes-application
>>> 2) Adding -Dexecution.savepoint.ignore-unclaimed-state=true to flink
>>> run-application -t kubernetes-application
>>> 3) Adding "execution.savepoint.ignore-unclaimed-state: true" to my local
>>> flink-conf.yaml where I'm running flink run-application
>>> 4) Overriding it in the application code:
>>> val sigh = new Configuration()
>>> sigh.setBoolean(SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
>>> true)
>>> env.configure(sigh)
>>>
>>> Every time the resulting pod ends up with "false" value for this setting
>>> in its configmap:
>>> $ kc describe cm/flink-config-flink-test | grep ignore
>>> execution.savepoint.ignore-unclaimed-state: false
>>>
>>> And I get the exception:
>>> java.lang.IllegalStateException: Failed to rollback to
>>> checkpoint/savepoint . Cannot map checkpoint/savepoint state for
>>> operator 68895e9129981bfc6d96d1dad715298e to the new program, because the
>>> operator is not available in the new program. If you want to allow to skip
>>> this, you can set the --allowNonRestoredState option on the CLI.
>>>
>>> It seems like something overrides it to false and it never has any
>>> effect.
>>>
>>> Can this be a bug or am I doing something wrong?
>>>
>>> For context, the savepoint is produced by Flink 1.8.2 and the version
>>> I'm trying to run on K8S is 1.14.3.
>>>
>>> --
>>> With regards,
>>> Andrey Bulgakov
>>>
>>>
>
> --
> With regards,
> Andrey Bulgakov
>


Re: CVE-2021-44228 - Log4j2 vulnerability

2022-02-20 Thread Francis Conroy
The release notification email came out a few days ago.



On Mon, 21 Feb 2022 at 14:18, Surendra Lalwani 
wrote:

> Hi Team,
>
> Any updates on Flink 1.13.6 version release?
>
> Regards,
> Surendra Lalwani
>
>
> On Fri, Feb 4, 2022 at 1:23 PM Martijn Visser 
> wrote:
>
>> Hi Surendra,
>>
>> You can follow the discussion on this topic in the Dev mailing list [1].
>> I would expect it in the next couple of weeks.
>>
>> Best regards,
>>
>> Martijn
>>
>> [1] https://lists.apache.org/thread/n417406j125n080vopljgfflc45yygh4
>>
>> On Fri, 4 Feb 2022 at 08:49, Surendra Lalwani 
>> wrote:
>>
>>> Hi Team,
>>>
>>> Any ETA on Flink version 1.13.6 release.
>>>
>>> Thanks and Regards ,
>>> Surendra Lalwani
>>>
>>>
>>> On Sun, Jan 9, 2022 at 3:50 PM David Morávek  wrote:
>>>
 Flink community officially only supports current and previous minor
 versions [1] (1.13, 1.14) with bug fixes. Personally I wouldn’t expect
 there will be another patch release for 1.12.

 If you really need an extra release for the unsupported version, the
 most straightforward approach would be manually building the Flink
 distribution from sources [2] with the patches you need.

 [1]
 https://flink.apache.org/downloads.html#update-policy-for-old-releases
 [2]

 https://github.com/apache/flink/tree/release-1.12#building-apache-flink-from-source

 D.

 On Sun 9. 1. 2022 at 10:10, V N, Suchithra (Nokia - IN/Bangalore) <
 suchithra@nokia.com> wrote:

> Hi David,
>
>
>
> As per the below comments, Flink 1.14.3 is in preparation and this
> hasn't started yet for Flink 1.13.6. Flink 1.12.8 release will be
> planned after this? If there is no current plan, could you please let us
> know what will be the regular release timing for 1.12.8 version.
>
>
>
> Regards,
>
> Suchithra
>
>
>
> *From:* David Morávek 
> *Sent:* Sunday, January 9, 2022 12:11 AM
> *To:* V N, Suchithra (Nokia - IN/Bangalore) 
> *Cc:* Chesnay Schepler ; Martijn Visser <
> mart...@ververica.com>; Michael Guterl ; Parag
> Somani ; patrick.eif...@sony.com; Richard
> Deurwaarder ; User ;
> subharaj.ma...@gmail.com; swamy.haj...@gmail.com
> *Subject:* Re: CVE-2021-44228 - Log4j2 vulnerability
>
>
>
> Hi Suchithra,
>
>
>
> there is currently no plan on doing another 1.12 release
>
>
>
> D.
>
>
>
> On Sat 8. 1. 2022 at 18:02, V N, Suchithra (Nokia - IN/Bangalore) <
> suchithra@nokia.com> wrote:
>
> Hi,
>
>
>
> When can we expect the flink 1.12 releases with log4j 2.17.1?
>
>
>
> Thanks,
>
> Suchithra
>
>
>
> *From:* Martijn Visser 
> *Sent:* Thursday, January 6, 2022 7:45 PM
> *To:* patrick.eif...@sony.com
> *Cc:* David Morávek ; swamy.haj...@gmail.com;
> subharaj.ma...@gmail.com; V N, Suchithra (Nokia - IN/Bangalore) <
> suchithra@nokia.com>; Chesnay Schepler ; User
> ; Michael Guterl ; Richard
> Deurwaarder ; Parag Somani 
> *Subject:* Re: CVE-2021-44228 - Log4j2 vulnerability
>
>
>
> Hi all,
>
>
>
> The ticket for upgrading Log4J to 2.17.0 is
> https://issues.apache.org/jira/browse/FLINK-25375. There's also the
> update to Log4j 2.17.1 which is tracked under
> https://issues.apache.org/jira/browse/FLINK-25472
>
>
>
> As you can see, both have a fix version set to 1.14.3 and 1.13.6.
> These versions haven't been released yet. Flink 1.14.3 is in preparation,
> this hasn't started yet for Flink 1.13.6.
>
>
>
> Best regards,
>
>
>
> Martijn
>
>
>
> On Thu, 6 Jan 2022 at 15:05,  wrote:
>
> Hi,
>
>
>
> just to be sure: Which Flink Releases for 1.14 and 1.13 have the
> upgraded log4j version 2.17.0?
>
> Are those already deployed to docker?
>
>
>
> Many Thanks in Advance.
>
>
>
> Kind Regards,
>
>
>
> Patrick
>
> --
>
> Patrick Eifler
>
>
>
> Senior Software Engineer (BI)
>
> Cloud Gaming Engineering & Infrastructure
> Sony Interactive Entertainment LLC
>
> Wilhelmstraße 118, 10963 Berlin
>
>
> Germany
>
> E: patrick.eif...@sony.com
>
>
>
> *From: *David Morávek 
> *Date: *Wednesday, 29. December 2021 at 09:35
> *To: *narasimha 
> *Cc: *Debraj Manna , Martijn Visser <
> mart...@ververica.com>, V N, Suchithra (Nokia - IN/Bangalore) <
> suchithra@nokia.com>, Chesnay Schepler , user
> , Michael Guterl , Richard
> Deurwaarder , Parag Somani 
> *Subject: *Re: CVE-2021-44228 - Log4j2 vulnerability
>
> Please follow the above mentioned ML thread for more details. Please
> note that this is a REGULAR release that is not 

Re: CVE-2021-44228 - Log4j2 vulnerability

2022-02-20 Thread Surendra Lalwani
Hi Team,

Any updates on Flink 1.13.6 version release?

Regards,
Surendra Lalwani


On Fri, Feb 4, 2022 at 1:23 PM Martijn Visser  wrote:

> Hi Surendra,
>
> You can follow the discussion on this topic in the Dev mailing list [1]. I
> would expect it in the next couple of weeks.
>
> Best regards,
>
> Martijn
>
> [1] https://lists.apache.org/thread/n417406j125n080vopljgfflc45yygh4
>
> On Fri, 4 Feb 2022 at 08:49, Surendra Lalwani 
> wrote:
>
>> Hi Team,
>>
>> Any ETA on Flink version 1.13.6 release.
>>
>> Thanks and Regards ,
>> Surendra Lalwani
>>
>>
>> On Sun, Jan 9, 2022 at 3:50 PM David Morávek  wrote:
>>
>>> Flink community officially only supports current and previous minor
>>> versions [1] (1.13, 1.14) with bug fixes. Personally I wouldn’t expect
>>> there will be another patch release for 1.12.
>>>
>>> If you really need an extra release for the unsupported version, the
>>> most straightforward approach would be manually building the Flink
>>> distribution from sources [2] with the patches you need.
>>>
>>> [1]
>>> https://flink.apache.org/downloads.html#update-policy-for-old-releases
>>> [2]
>>>
>>> https://github.com/apache/flink/tree/release-1.12#building-apache-flink-from-source
>>>
>>> D.
>>>
>>> On Sun 9. 1. 2022 at 10:10, V N, Suchithra (Nokia - IN/Bangalore) <
>>> suchithra@nokia.com> wrote:
>>>
 Hi David,



 As per the below comments, Flink 1.14.3 is in preparation and this
 hasn't started yet for Flink 1.13.6. Flink 1.12.8 release will be
 planned after this? If there is no current plan, could you please let us
 know what will be the regular release timing for 1.12.8 version.



 Regards,

 Suchithra



 *From:* David Morávek 
 *Sent:* Sunday, January 9, 2022 12:11 AM
 *To:* V N, Suchithra (Nokia - IN/Bangalore) 
 *Cc:* Chesnay Schepler ; Martijn Visser <
 mart...@ververica.com>; Michael Guterl ; Parag
 Somani ; patrick.eif...@sony.com; Richard
 Deurwaarder ; User ;
 subharaj.ma...@gmail.com; swamy.haj...@gmail.com
 *Subject:* Re: CVE-2021-44228 - Log4j2 vulnerability



 Hi Suchithra,



 there is currently no plan on doing another 1.12 release



 D.



 On Sat 8. 1. 2022 at 18:02, V N, Suchithra (Nokia - IN/Bangalore) <
 suchithra@nokia.com> wrote:

 Hi,



 When can we expect the flink 1.12 releases with log4j 2.17.1?



 Thanks,

 Suchithra



 *From:* Martijn Visser 
 *Sent:* Thursday, January 6, 2022 7:45 PM
 *To:* patrick.eif...@sony.com
 *Cc:* David Morávek ; swamy.haj...@gmail.com;
 subharaj.ma...@gmail.com; V N, Suchithra (Nokia - IN/Bangalore) <
 suchithra@nokia.com>; Chesnay Schepler ; User <
 user@flink.apache.org>; Michael Guterl ; Richard
 Deurwaarder ; Parag Somani 
 *Subject:* Re: CVE-2021-44228 - Log4j2 vulnerability



 Hi all,



 The ticket for upgrading Log4J to 2.17.0 is
 https://issues.apache.org/jira/browse/FLINK-25375. There's also the
 update to Log4j 2.17.1 which is tracked under
 https://issues.apache.org/jira/browse/FLINK-25472



 As you can see, both have a fix version set to 1.14.3 and 1.13.6. These
 versions haven't been released yet. Flink 1.14.3 is in preparation, this
 hasn't started yet for Flink 1.13.6.



 Best regards,



 Martijn



 On Thu, 6 Jan 2022 at 15:05,  wrote:

 Hi,



 just to be sure: Which Flink Releases for 1.14 and 1.13 have the
 upgraded log4j version 2.17.0?

 Are those already deployed to docker?



 Many Thanks in Advance.



 Kind Regards,



 Patrick

 --

 Patrick Eifler



 Senior Software Engineer (BI)

 Cloud Gaming Engineering & Infrastructure
 Sony Interactive Entertainment LLC

 Wilhelmstraße 118, 10963 Berlin


 Germany

 E: patrick.eif...@sony.com



 *From: *David Morávek 
 *Date: *Wednesday, 29. December 2021 at 09:35
 *To: *narasimha 
 *Cc: *Debraj Manna , Martijn Visser <
 mart...@ververica.com>, V N, Suchithra (Nokia - IN/Bangalore) <
 suchithra@nokia.com>, Chesnay Schepler , user <
 user@flink.apache.org>, Michael Guterl , Richard
 Deurwaarder , Parag Somani 
 *Subject: *Re: CVE-2021-44228 - Log4j2 vulnerability

 Please follow the above mentioned ML thread for more details. Please
 note that this is a REGULAR release that is not motivated by the log4j CVE,
 so the stability of the release is the more important factor then having it
 out as soon as possible.



 D.



 On Mon, Dec 27, 2021 at 6:33 AM narasimha 
 wrote:

 Hi folks,



 When 

Re: 关于“批”处理元素导致batch的event timestamp变化,后接window统计错乱的问题。

2022-02-20 Thread yidan zhao
补充,目前来看,后边加一个 WatermarkStrategy 虽然可以解决问题,但是会再引入1个延迟(maxOutOfOrderness)。
或者,虽然上层输入都是window的输出,因此ts肯定是window.maxTs,因此后续的
maxOutOfOrderness可以采用1s就可以保障不乱序。但是,window的输出频率却导致了后续的watermark速度延迟整个window的时长。
因此,最好可能还是需要自定义operator将ts修复,但不影响watermark的传播。

yidan zhao  于2022年2月19日周六 11:49写道:

> 背景
>
> 部分算子需要redis查询,为了性能,需要做批量redis查询,通过redis
> pipeline机制查询。所以对于输入流A,我会做一个BatchGenerator算子B用于将输入数据转位batch输出,然后算子C查询redis后输出,并且输出时拆分batch为多个单独元素,进入算子D。D算子如果是window统计,...。
>
> 问题
>
> 算子D部分会发现收到的元素无timestamp、或timestamp错误,导致程序出错,或者窗口划分错误。
>
> 原因分析和解决等
>
>
> 无timestamp是因为BatchGenerator为了实现超时机制(避免输入元素过少,导致长时间无法累积够一个batch),使用到flink的
> process time 定时机制(注意仅KeyedStream支持定时器),这导致 onTimer 输出的时候erase掉了event time。
> 这个问题稍微好解决,换 event time 定时器即可。
> 为了避免KeyedStream的限制,也可以换用DataStream,然后超时则通过java.util.Timer()实现,等等。
> 假设流A也是某个window的输出,对于2个A输出的元素a和b,带有ts1和ts2的event time(window 输出 event
> time是window的maxtimestamp),a和b是不同窗口。  a 和
> b进入batchGenerator算子被batch到一起,输出的batch元素event
> time是ts2(假设batchSize=2)。这就导致后续window计算会出问题,因为b元素的ts仍然是ts2,但是a元素的ts从ts1变成了ts2,导致窗口划分错误。
>
> 如上分析后后,我改造了BatchGenerator,内部不使用一个buffer,改成使用一组buffer,通过Map组织,key为window_ts(可以用于标记window唯一性的值,比如5min窗口则采用MMddHHmm格式的日志作为window_ts),这样让不同window_ts的元素不会进入相同batch,避免被batch到一起导致ts错乱。这样看起来好像解决了,但实际没有彻底解决。原因1:超时情况,timer超时并无法输出batch,需要靠另一个进入的元素触法,会引入ts错乱。原因2:buffer通过ts组织之后,对于ts1假设输入了1个元素,为避免ts1再也不输入其他元素,因为输出的触发必须不区分ts,也就是说对于ts1对应的buffer虽然有独立的timer控制timeout,但其输出触发却必须允许其他ts的元素进入触发,否则会导致内存泄漏(元素永远无法输出)。这让一切回到原点。
>
> 如上,目前经过几个版本更改,发现徒劳了。貌似没有本质解决方法(当然后面讲的方法另外说)。无超时则不需要解决超时导致的问题。但即使不考虑超时机制,batch本身的问题也没办法。
>
>
> 其他解决思路1:考虑processFunc的collector实际是timestampedCollector,在函数部分强制转换collector位timestampedCollector,然后设置timestamp到合理时间(在redis查询完成,分拆输出的时候)?
> 解决思路2:仔细想想思路1,发现绕来绕去还是需要修正timestamp。这么来看,还不如直接redis算子后边加个
> assignTimestamp重新设置ts和watermark生成。
>
>
> 如上,小伙伴们觉得有啥其他思路嘛。 还有就是目前flink貌似不支持仅重设ts而不重设watermark嘛?
> 之前我倒是实现过一个仅重设watermarkd的,主要通过在processWatermark部分加offset实现。
>


关于timestamp在eventTime和processTIme、以及各种不同API情况下的不同实现。

2022-02-20 Thread yidan zhao
如题,目前看很多实现不是很清楚为啥这么设计。
比如,对于window来说,不论是et还是pt情况,emit的时候都是window.maxTs。

对于process部分,因为不支持定时器,因此只存在processElement输出,采用输入ele的ts。
对于keyedProcess部分,processElement部分同上;onEventTime则采用输入ele的ts,但onProcessTime则删除了ele的ts,这是为啥?


Re: flink 不触发checkpoint

2022-02-20 Thread Tony Wei
Hi,

有考慮升級 1.14 嗎?Flink 1.14 支持了 FLIP-147,讓 Flink 在 task 為 finished 狀態時仍能觸發
checkpoint [1, 2]。

[1]
https://flink.apache.org/news/2021/09/29/release-1.14.0.html#checkpointing-and-bounded-streams
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished

best regards,

RS  於 2022年2月18日 週五 下午5:27寫道:

> 1. 图片挂了,看不到,尽量用文字,或者用图床等工具
> 2. 启动任务有配置checkpoint吗?
>
>
>
>
>
>
>
>
>
>
> 在 2022-02-17 11:40:04,"董少杰"  写道:
>
>
> flink读取csv文件建表,同时消费kafka数据建表,两张表join之后写入hdfs(hudi),读取csv数据的任务已经是finished状态,就会触发不了checkpoint,看有什么办法能让它正常触发checkpoint?
> flink版本1.12.2。
> 谢谢!
>
>
>
>
>
>
> | |
> 董少杰
> |
> |
> eric21...@163.com
> |


Pulsar connector 2.9.1 failing job submission in standalone.

2022-02-20 Thread Ananth Gundabattula
Hello All,

I have a Pyflink script that needs to read from Pulsar and process the data.

I have done the following to implement a prototype.


  1.  Since I need Pyflink way to connect to Pulsar , I checked out the code 
from master branch as advised in a different thread. (PyFlink Pulsar connector 
seems to be slated for 1.15 release)
  2.  I built the Flink source.
  3.  I am using the following location as FLINK_HOME under the source: 
flink-dist/target/flink-1.15-SNAPSHOT-bin/flink-1.15-SNAPSHOT
  4.  The python pyflink wheels have been appropriately installed in the right 
python conda environment.
  5.  I copied the flink-sql-connector-pulsar-1.15-SNAPSHOT.jar into the 
$FLINK_HOME/lib folder.
  6.  I started the standalone cluster by running bin/start-cluster.sh
  7.  I submit my test script by using bin/flink run –python …
  8.  If am launching the the word_count example in flink documentation, 
everything runs fine and it completes successfully.
  9.  However, if the script involves the Pulsar connector, the logs show that 
the Flink client codebase is not able to submit the job to the Jobamanger.
  10. It ultimately dies with a Channel Idle exception. (See this in DEBUG mode 
of the logs). I am attaching the logs for reference.

I am trying this on OSx. Please note that the classic word_count script works 
fine without any issues and I see the job submission failures on the client 
only when the pulsar source connector is in the script. I have also added the 
logs for the standalone session job manager.I am also attaching the script for 
reference.

Could you please advise what can I do to resolve the issue. (Will raise an 
JIRA-Issue if someone thinks it is a bug).

Regards,
Ananth



flink-ananth-client-Ananths-MacBook-Pro.local.log.gz
Description: flink-ananth-client-Ananths-MacBook-Pro.local.log.gz


flink-ananth-standalonesession-0-Ananths-MacBook-Pro.local.log.gz
Description:  flink-ananth-standalonesession-0-Ananths-MacBook-Pro.local.log.gz


test_pulsar.py.gz
Description: test_pulsar.py.gz


Apache Flink - Continuously streaming data using jdbc connector

2022-02-20 Thread M Singh
Hi Folks:
I am trying to monitor a jdbc source and continuously streaming data in an 
application using the jdbc connector.  However, the application stops after 
reading the data in the table.
I've checked the docs 
(https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/)
 and it looks like there is a streaming sink but the sources are scan and 
lookup only.  I've also checked the connector settings but could not find any 
flag for continuous monitoring.
Can you please let me know if there is setting in the connector or advice to 
make the jdbc connector source streaming data continuously ?
Thanks for your help.
Mans

Re: How to flatten ARRAY in Table API

2022-02-20 Thread Matthias Broecheler
Thank you, David, that's what I had been looking for!

On Sun, Feb 20, 2022 at 3:52 AM David Anderson  wrote:

> Matthias,
>
> You can use a CROSS JOIN UNNEST, as mentioned very briefly in the docs [1].
>
> Something like this should work:
>
> SELECT
>   id, customerid, productid, quantity, ...
> FROM
>   orders
> CROSS JOIN UNNEST(entries) AS items (productid, quantity, unit_price,
> discount);
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/joins/#array-expansion
>
> Regards,
> David
>
> On Sun, Feb 20, 2022 at 2:25 AM Matthias Broecheler 
> wrote:
>
>> Hey Flinksters,
>>
>> I'm reading a nested JSON object into a table and would like to access
>> the nested rows inside an array. Is there a way to flatten them so that I
>> get a table with the nested rows?
>>
>> So far, I've only been able to figure out how to access a specific
>> element inside the array using the "at" method but I'm trying to flatten
>> the nested rows into a table and the arrays can have variable length. Below
>> is a code snippet of what I have thus far but notice how I'm only accessing
>> the first element in each array.
>>
>> How do you do this in Flink? Apologies if this is obvious - I wasn't able
>> to find an example or documentation and would appreciate any help.
>>
>> Thank you,
>> Matthias
>>
>> ---
>>
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>>
>>
>> TableDescriptor jsonTable = TableDescriptor.forConnector("filesystem")
>> .schema(Schema.newBuilder()
>> .column("id", DataTypes.INT())
>> .column("customerid", DataTypes.INT())
>> .column("time", DataTypes.INT())
>> .column("entries", DataTypes.ARRAY(DataTypes.ROW(
>> DataTypes.FIELD("productid", DataTypes.INT()),
>> DataTypes.FIELD("quantity", DataTypes.INT()),
>> DataTypes.FIELD("unit_price", 
>> DataTypes.DECIMAL(9,3)),
>> DataTypes.FIELD("discount", DataTypes.DECIMAL(9,3))
>> )))
>> .build())
>> .option("path", C360Test.RETAIL_DATA_DIR.toAbsolutePath() + 
>> "/orders.json")
>> .format("json")
>> .build();
>>
>>
>>
>> tEnv.createTable("Orders",jsonTable);
>> Table orders = tEnv.from("Orders");
>>
>> Table flattenEntries = 
>> orders.select($("entries").at(1).get("quantity").sum().as("totalquant"));
>>
>>


Flink Statefun and Feature computation

2022-02-20 Thread Federico D'Ambrosio
Hello everyone,

It's been quite a while since I wrote to the Flink ML, because in my
current job never actually arose the need for a stateful stream processing
system, until now.

Since the last version I actually tried was Flink 1.9, well before Stateful
Functions, I had a few questions about some of the latest features.

1. What are the use cases for which Flink Statefuns were thought of? As far
as I understand from the documentation, they are basically processors that
can be separated from a "main" Flink streaming job (and can be integrated
with), but I fail to grasp how they should differ from a rest endpoint
implemented using any other framework.
2. How is the storage for these functions configured? I see that the
storage for the state is accessed via a Context object, so I think it is
configured by a Flink cluster configuration?

I would like, then, to elaborate on my use case: we have some 20 CDC topics
(1 topic per table) on Kafka. Upon the data streamed on these topics, we
need to compute many features to be used by a ML model. Many of these
features need to be computed by joining multiple topics and/or need the
whole history of the field. So, I was wondering if Stateful Functions could
be a good approach to this problem, where a feature could be "packaged" in
a single stateful function to be "triggered" by the arrival of any new
message on the topic configured as its ingress.

So, basically, I'm wondering if they could fit the use case, or we're
better off with a custom flink job.

Thank you for your time,
-- 
Federico D'Ambrosio


Re: How to flatten ARRAY in Table API

2022-02-20 Thread David Anderson
Matthias,

You can use a CROSS JOIN UNNEST, as mentioned very briefly in the docs [1].

Something like this should work:

SELECT
  id, customerid, productid, quantity, ...
FROM
  orders
CROSS JOIN UNNEST(entries) AS items (productid, quantity, unit_price,
discount);

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/joins/#array-expansion

Regards,
David

On Sun, Feb 20, 2022 at 2:25 AM Matthias Broecheler 
wrote:

> Hey Flinksters,
>
> I'm reading a nested JSON object into a table and would like to access the
> nested rows inside an array. Is there a way to flatten them so that I get a
> table with the nested rows?
>
> So far, I've only been able to figure out how to access a specific element
> inside the array using the "at" method but I'm trying to flatten the nested
> rows into a table and the arrays can have variable length. Below is a code
> snippet of what I have thus far but notice how I'm only accessing the first
> element in each array.
>
> How do you do this in Flink? Apologies if this is obvious - I wasn't able
> to find an example or documentation and would appreciate any help.
>
> Thank you,
> Matthias
>
> ---
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
>
>
> TableDescriptor jsonTable = TableDescriptor.forConnector("filesystem")
> .schema(Schema.newBuilder()
> .column("id", DataTypes.INT())
> .column("customerid", DataTypes.INT())
> .column("time", DataTypes.INT())
> .column("entries", DataTypes.ARRAY(DataTypes.ROW(
> DataTypes.FIELD("productid", DataTypes.INT()),
> DataTypes.FIELD("quantity", DataTypes.INT()),
> DataTypes.FIELD("unit_price", DataTypes.DECIMAL(9,3)),
> DataTypes.FIELD("discount", DataTypes.DECIMAL(9,3))
> )))
> .build())
> .option("path", C360Test.RETAIL_DATA_DIR.toAbsolutePath() + 
> "/orders.json")
> .format("json")
> .build();
>
>
>
> tEnv.createTable("Orders",jsonTable);
> Table orders = tEnv.from("Orders");
>
> Table flattenEntries = 
> orders.select($("entries").at(1).get("quantity").sum().as("totalquant"));
>
>


Job Manager High Availability using Kubernetes HA Services is not working with custom S3 encrypted client

2022-02-20 Thread Almog Rozencwajg
Hi,

We are using Flink version 1.12.2 running on Kubernetes using native Kubernetes 
integration.
We are trying to use the job manager HA describes in the Flink official 
documentation Kubernetes HA 
Services.
For high-availability.storageDir we are using S3.
We implemented our own S3 client for the Hadoop plugin in order to use S3 
client side encryption.
The job manager failed when the Hadoop trying to upload the recovery metadata 
to S3 using the multipart upload API.
The Hadoop uploads the data in parts calling the upload part method and failed 
on:
'com.amazonaws.SdkClientException: Invalid part size: part sizes for encrypted 
multipart uploads must be multiples of the cipher block size (16) with the 
exception of the last part'.
When using S3 client side encryption with multipart upload API there is a 
limitation on the part size.
Is there a way we can control via configuration the part size of the recovery 
data uploaded via Flink HA?

Thanks,
Almog

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


Re: Fwd: How to get memory specific metrics for tasknodes

2022-02-20 Thread Sigalit Eliazov
Hello. Related metric issue. I am looking for a way to expose flink metrics
via opentelemerty to the gcp could monitoring dashboard.
Does anyone has experience with that?

Thanks

בתאריך יום ו׳, 18 בפבר׳ 2022, 21:55, מאת Chesnay Schepler ‏<
ches...@apache.org>:

> As I said, this is not possible. In the custom reporter you can filter out
> all the metrics you do not want.
>
> On 18/02/2022 19:44, Diwakar Jha wrote:
>
> Thank you. I understand that filtering metrics is not possible but i can
> configure it for only the memory metrics listed here? :
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#memory
>
> Currently, it's publishing all the container/operator metrics which is
> what I want to avoid. Thanks!
>
>
> On Wed, Feb 16, 2022 at 11:35 PM Chesnay Schepler 
> wrote:
>
>> It is currently not possible to select metrics.
>>
>> What you can do however is create a custom reporter that wraps the StatsD
>> reporter which does this filtering.
>>
>> On 16/02/2022 17:41, Diwakar Jha wrote:
>>
>>
>> Hello,
>>
>> Could someone please help! I'm trying  to publish only these three
>> metrics per tasknode
>> Status.JVM.Memory.Heap.Used
>> Status.JVM.Memory.Heap.Committed
>> Status.JVM.Memory.NonHeap.Max
>>
>> But, with my current setting I see all Flink metrics getting published.
>> Please let me know if I need to provide any other information.
>>
>> Thank you!
>>
>>
>> -- Forwarded message -
>> From: Diwakar Jha 
>> Date: Tue, Feb 15, 2022 at 1:31 PM
>> Subject: How to get memory specific metrics for tasknodes
>> To: user 
>>
>>
>> Hello,
>>
>> I'm running Flink 1.11 on AWS EMR using the Yarn application. I'm trying
>> to access memory metrics(Heap.Max, Heap.Used) per tasknode in CloudWatch. I
>> have 50 tasknodes and it creates Millions of metrics(including per
>> operator) though I need only a few metrics per tasknode (Heap.Max,
>> Heap.Used). It is way too much than my current cloudwatch limit and I also
>> don't need so many metrics.
>> Could someone please help me how to get only the tasknode memory specific
>> metrics ?
>> I'm referring to this doc :
>> https://nightlies.apache.org/flink/flink-docs-release-1.7/monitoring/metrics.html#memory
>>
>> I used the following approach to enable Flink metrics.
>> 1. Enable Flink Metrics
>> copy /opt/flink-metrics-statsd-x.x.jar into the /lib folder of your
>> Flink distribution
>> 2.  Add StatsD metric reporter in Flink-conf to send to CloudWatch
>> Agent's StatsD interface
>> metrics.reporters: stsd
>> metrics.reporter.stsd.factory.class:
>> org.apache.flink.metrics.statsd.StatsDReporterFactory
>> metrics.reporter.stsd.host: localhost
>> metrics.reporter.stsd.port: 8125
>> 3. Setup tasknode scope
>> metrics.scope.tm: taskmanager
>> 4. setup Cloudwatch agent to publish the metrics
>> "metrics":{
>>   "namespace": "CustomeNamespace/FlinkMemoryMetrics",
>>   "metrics_collected":{
>>  "statsd":{
>> "service_address":":8125",
>> "metrics_collection_interval":60,
>> "metrics_aggregation_interval":300
>>  }
>>   }
>>   },
>>
>> Thanks!
>>
>>
>>
>>
>