[state.checkpoints.num-retained ]The default value does not take effect

2021-12-16 Thread chenqizhu
hi,
   
   The configration is valid only when I specify -Dstate.cache. num-retained=n 
by command line interface.
   If I do not specify this configuration, the default value does not take 
effect , is it a bug ?


   my flink version : flink-1.13.3

Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-16 Thread Arvid Heise
Wouldn't it be better to ask the Iceberg maintainers to support dynamic
schema change?

On Fri, Dec 17, 2021 at 3:03 AM Dong Lin  wrote:

> Hi Ayush,
>
> Your use-case should be supported.  Sorry, we don't have a good way to
> support this in Flink 1.14.
>
> I am going to propose a FLIP to fix it in Flink 1.15.
>
> Thanks,
> Dong
>
>
> On Thu, Dec 9, 2021 at 7:11 PM Ayush Chauhan 
> wrote:
>
>> My usecase is that as soon as the avro message version is changed, I want
>> to reload the job graph so that I can update the downstream iceberg table.
>>
>> Iceberg FlinkSink take table schema during the job start and cannot be
>> updated during runtime. So, I want to trigger graceful shutdown and restart
>> the job.
>>
>> Can I reload the job graph to achieve that?
>>
>>
>>
>> On Wed, Dec 8, 2021 at 8:11 PM Arvid Heise  wrote:
>>
>>> Hi Ayush,
>>>
>>> DeserializationSchema.isEndOfStream was only ever supported by Kafka.
>>> For new Kafka source, the recommended way is to use the bounded mode like
>>> this
>>>
>>> KafkaSource source =
>>> KafkaSource.builder()
>>> ...
>>> .setStartingOffsets(OffsetsInitializer.earliest())
>>> .setBounded(OffsetsInitializer.latest())
>>> .build();
>>>
>>> You can implement your own OffsetsInitializer or use a provided one.
>>>
>>> On Wed, Dec 8, 2021 at 9:19 AM Hang Ruan  wrote:
>>>
 There is no way to end the kafka stream from the deserializer.

 When would you want to end the stream? Could you explain why you need
 to end the kafka stream without using the offset?

 Ayush Chauhan  于2021年12月8日周三 15:29写道:

>
> https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L69
>
>
>
> On Wed, Dec 8, 2021 at 12:40 PM Robert Metzger 
> wrote:
>
>> Hi Ayush,
>>
>> I couldn't find the documentation you've mentioned. Can you send me a
>> link to it?
>>
>> On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan <
>> ayush.chau...@zomato.com> wrote:
>>
>>> Hi,
>>>
>>> Can you please let me know the alternatives of isEndOfStream() as
>>> now according to docs this method will no longer be used to determine 
>>> the
>>> end of the stream.
>>>
>>> --
>>>  Ayush Chauhan
>>>  Data Platform
>>>  [image: mobile-icon]  +91 9990747111
>>>
>>>
>>> This email is intended only for the person or the entity to whom it
>>> is addressed. If you are not the intended recipient, please delete this
>>> email and contact the sender.
>>>
>>
>
> --
>  Ayush Chauhan
>  Data Platform
>  [image: mobile-icon]  +91 9990747111
>
>
> This email is intended only for the person or the entity to whom it is
> addressed. If you are not the intended recipient, please delete this email
> and contact the sender.
>

>>
>> --
>>  Ayush Chauhan
>>  Data Platform
>>  [image: mobile-icon]  +91 9990747111
>>
>>
>> This email is intended only for the person or the entity to whom it is
>> addressed. If you are not the intended recipient, please delete this email
>> and contact the sender.
>>
>


Re: [ANNOUNCE] Apache Flink 1.14.2 / 1.13.5 / 1.12.7 / 1.11.6 released

2021-12-16 Thread Leonard Xu
I guess this is related to publishers everywhere are updating their artifacts 
in response to the log4shell vulnerability[1].

All we can do and need to do is wait. ☕️

Best,
Leonard
[1] https://issues.sonatype.org/browse/OSSRH-76300 




> 2021年12月17日 下午2:21,Jingsong Li  写道:
> 
> Not found in 
> https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-java/
> 
> I guess too many people sent versions, resulting in maven central
> repository synchronization being slower.
> 
> Best,
> Jingsong
> 
> On Fri, Dec 17, 2021 at 2:00 PM casel.chen  wrote:
>> 
>> I can NOT find flink 1.13.5 related jar in maven central repository, did you 
>> upload them onto there already? Thanks!
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> At 2021-12-17 01:26:19, "Chesnay Schepler"  wrote:
>>> The Apache Flink community has released emergency bugfix versions of
>>> Apache Flink for the 1.11, 1.12, 1.13 and 1.14 series.
>>> 
>>> These releases include a version upgrade for Log4j to address
>>> [CVE-2021-44228](https://nvd.nist.gov/vuln/detail/CVE-2021-44228) and
>>> [CVE-2021-45046](https://nvd.nist.gov/vuln/detail/CVE-2021-45046).
>>> 
>>> We highly recommend all users to upgrade to the respective patch release.
>>> 
>>> The releases are available for download at:
>>> https://flink.apache.org/downloads.html
>>> 
>>> Please check out the release blog post for further details:
>>> https://flink.apache.org/news/2021/12/16/log4j-patch-releases.html
>>> 
>>> 
>>> Regards,
>>> Chesnay
>> 
>> 
>> 
>> 
> 
> 
> 
> -- 
> Best, Jingsong Lee



Re: [ANNOUNCE] Apache Flink 1.14.2 / 1.13.5 / 1.12.7 / 1.11.6 released

2021-12-16 Thread Jingsong Li
Not found in 
https://repo1.maven.org/maven2/org/apache/flink/flink-table-api-java/

I guess too many people sent versions, resulting in maven central
repository synchronization being slower.

Best,
Jingsong

On Fri, Dec 17, 2021 at 2:00 PM casel.chen  wrote:
>
> I can NOT find flink 1.13.5 related jar in maven central repository, did you 
> upload them onto there already? Thanks!
>
>
>
>
>
>
>
> At 2021-12-17 01:26:19, "Chesnay Schepler"  wrote:
> >The Apache Flink community has released emergency bugfix versions of
> >Apache Flink for the 1.11, 1.12, 1.13 and 1.14 series.
> >
> >These releases include a version upgrade for Log4j to address
> >[CVE-2021-44228](https://nvd.nist.gov/vuln/detail/CVE-2021-44228) and
> >[CVE-2021-45046](https://nvd.nist.gov/vuln/detail/CVE-2021-45046).
> >
> >We highly recommend all users to upgrade to the respective patch release.
> >
> >The releases are available for download at:
> >https://flink.apache.org/downloads.html
> >
> >Please check out the release blog post for further details:
> >https://flink.apache.org/news/2021/12/16/log4j-patch-releases.html
> >
> >
> >Regards,
> >Chesnay
>
>
>
>



-- 
Best, Jingsong Lee


紧急bugfix的那些flink jar包在maven中心仓库上找不到

2021-12-16 Thread casel.chen
例如 flink 1.13.5,这些jar包有上传到maven中心仓库吗?我没有看到,编译的时候出错了。

Re:[ANNOUNCE] Apache Flink 1.14.2 / 1.13.5 / 1.12.7 / 1.11.6 released

2021-12-16 Thread casel.chen
I can NOT find flink 1.13.5 related jar in maven central repository, did you 
upload them onto there already? Thanks!

















At 2021-12-17 01:26:19, "Chesnay Schepler"  wrote:
>The Apache Flink community has released emergency bugfix versions of 
>Apache Flink for the 1.11, 1.12, 1.13 and 1.14 series.
>
>These releases include a version upgrade for Log4j to address 
>[CVE-2021-44228](https://nvd.nist.gov/vuln/detail/CVE-2021-44228) and 
>[CVE-2021-45046](https://nvd.nist.gov/vuln/detail/CVE-2021-45046).
>
>We highly recommend all users to upgrade to the respective patch release.
>
>The releases are available for download at:
>https://flink.apache.org/downloads.html
>
>Please check out the release blog post for further details:
>https://flink.apache.org/news/2021/12/16/log4j-patch-releases.html
>
>
>Regards,
>Chesnay


Re: Flink fails to load class from configured classpath using PipelineOptions

2021-12-16 Thread Yang Wang
The config option "pipeline.jars" is used to specify the user jar, which
contains the main class.
I think what you need is "pipeline.classpaths".

/**
 * A list of URLs that are added to the classpath of each user code
classloader of the program.
 * Paths must specify a protocol (e.g. file://) and be accessible on all nodes
 */
public static final ConfigOption> CLASSPATHS =
key("pipeline.classpaths")
.stringType()
.asList()
.noDefaultValue()
.withDescription(
"A semicolon-separated list of the classpaths
to package with the job jars to be sent to"
+ " the cluster. These have to be valid URLs.");


Best,
Yang

Pouria Pirzadeh  于2021年12月17日周五 03:43写道:

> I am developing a Java application which uses UDFs on Flink 1.14.
> It uses PipelineOptions.JARS config to add jar files, containing UDF
> classes, dynamically to the user classpath in the main method; However the
> application fails to load UDF class from configured jar files at job
> launch time with and crashes with ClassNotFoundException.
>
> Is PipelineOptions.JARS the correct option to add files to classpath on
> Job manager and all task managers?
>
> Sample code snippet:
>
> final Configuration configuration = new Configuration();
>
> configuration.set(PipelineOptions.JARS,Collections.singletonList("file:///path/to/udf.jar"));
> StreamExecutionEnvironment streamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv);
> ...
> Class udfClass = Class.forName("demo.MyUDF", ...);
> tableEnv.createTemporarySystemFunction("MyUDF", udfClass);
> ...
>
> Error stack trace:
> Exception in thread "main" java.lang.ClassNotFoundException: demo.MyUDF
> at
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:582)
> at
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
> at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
> at
> java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1886)
> at
> java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1772)
> at
> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
> at
> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1594)
> at
> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
> at
> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:692)
> at
> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:714)
> at
> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:130)
> at
> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCallWithDataType(BridgingFunctionGenUtil.scala:116)
> at
> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:73)
> at
> org.apache.flink.table.planner.codegen.calls.BridgingSqlFunctionCallGen.generate(BridgingSqlFunctionCallGen.scala:81)
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:825)
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:503)
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:58)org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
> at
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:437)
> at
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:432)
> at
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:356)
> ...
>


Re: unexpected result of interval join when using sql

2021-12-16 Thread Caizhi Weng
Hi!

Thanks for raising this issue. This is unfortunately a bug. I've created a
JIRA ticket [1] and you can track the progress of this issue there.

[1] https://issues.apache.org/jira/browse/FLINK-25357

Schwalbe Matthias  于2021年12月16日周四 14:51写道:

> Probably an oversight … did you actually mean to publish your password?
> Better change it the sooner possible …
>
>
>
> Thias
>
>
>
>
>
> *From:* cy 
> *Sent:* Donnerstag, 16. Dezember 2021 06:55
> *To:* user@flink.apache.org
> *Subject:* unexpected result of interval join when using sql
>
>
>
> Hi
>
> Flink 1.14.0 Scala 2.12
>
>
>
> I'm using flink sql interval join ability, here is my table schema and sql
>
>
>
> create table `queue_3_ads_ccops_perf_o_ebs_volume_capacity` (
> `dtEventTime` timestamp(3), `dtEventTimeStamp` bigint, `sourceid` string,
> `cluster_name` string, `poolname` string, `storage_poolname` string,
> `usage` decimal(10, 4), `provisioned_size` decimal(10, 4), `startat`
> timestamp(3), `endat` timestamp(3), `vrespool_id` int, `uuid` string,
> `version` string, `localTime` timestamp(3), `cluster_id` int, `extend1`
> string, `extend2` string, `extend3` string, `mon_ip` string, `bussiness_ip`
> string, `datasource` string, `thedate` int, `name` string, `used_size` int,
> watermark for `startat` as `startat` - interval '60' minutes ) with (
> 'connector' = 'kafka', 'topic' =
> 'queue_3_ads_ccops_perf_o_ebs_volume_capacity', 'format' = 'json',
> 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = '
> 10.172.234.67:9092,10.172.234.68:9092,10.172.234.69:9092', '
> properties.group.id' = 'layer-vdisk', 'properties.security.protocol' =
> 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'SCRAM-SHA-512',
> 'properties.sasl.jaas.config' =
> 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule
> required username="bkdata_admin" password="D41J48Cz3iwW7k6fFogX1A";' );
>
>
>
> SELECT
>
> source.sourceid AS sourceid,
>
> cast(source.startat AS timestamp) AS source_startat,
>
> cast(target.startat AS timestamp) AS target_startat,
>
> source.used_size AS source_used_size,
>
> target.used_size AS target_used_size,
>
> source.usage AS source_usage,
>
> target.usage AS target_usage
>
> FROM queue_3_ads_ccops_perf_o_ebs_volume_capacity source,
> queue_3_ads_ccops_perf_o_ebs_volume_capacity target
>
> WHERE source.sourceid = target.sourceid
>
> AND source.sourceid in (
>
> 'volume-9dfed0d9-28b2-418a-9215-ce762ef80920',
>
> 'volume-9ece34f1-f4bb-475a-8e64-a2e37711b4fc',
>
> 'volume-9f0ec4cc-5cc4-49a8-b715-a91a25df3793',
>
> 'volume-9f38e0b3-2324-4505-a8ad-9b1ccb72181f',
>
> 'volume-9f3ec256-10fb-4d8b-a8cb-8498324cf309'
>
> )
>
> AND source.startat >= FLOOR(target.startat TO HOUR) + INTERVAL '1' HOUR
> AND source.startat < FLOOR(target.startat TO HOUR) + INTERVAL '2' HOUR;
>
>
>
> and result
>
>
>
> I'm confused about first row that source_startat and target_startat was
> not matched the time condition.
>
> Also I try to execute the sql below
>
>
>
> SELECT TO_TIMESTAMP('2021-12-13 14:05:06') >=
> FLOOR(TO_TIMESTAMP('2021-12-13 12:05:08') TO HOUR) + INTERVAL '1' HOUR AND
> TO_TIMESTAMP('2021-12-13 14:05:06') < FLOOR(TO_TIMESTAMP('2021-12-13
> 12:05:08') TO HOUR) + INTERVAL '2' HOUR;
>
>
>
> the result false is correct.
>
>
>
> So is anything wrong with flink sql interval join?
>
>
>
> Need your help, thank you.
>
>
>
>
>
>
>
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>


【Log4j.properties使用AsyncAppender】

2021-12-16 Thread lpengdr...@163.com
Hi:

flink使用log4j.properties配置,.properties配置似乎没有办法配置AsyncAppender,所以在flink的日志没有办法使用AsyncAppender了吗?是否有其他的办法可以绕行



lpengdr...@163.com


Re: Alternatives of KafkaDeserializationSchema.isEndOfStream()

2021-12-16 Thread Dong Lin
Hi Ayush,

Your use-case should be supported.  Sorry, we don't have a good way to
support this in Flink 1.14.

I am going to propose a FLIP to fix it in Flink 1.15.

Thanks,
Dong


On Thu, Dec 9, 2021 at 7:11 PM Ayush Chauhan 
wrote:

> My usecase is that as soon as the avro message version is changed, I want
> to reload the job graph so that I can update the downstream iceberg table.
>
> Iceberg FlinkSink take table schema during the job start and cannot be
> updated during runtime. So, I want to trigger graceful shutdown and restart
> the job.
>
> Can I reload the job graph to achieve that?
>
>
>
> On Wed, Dec 8, 2021 at 8:11 PM Arvid Heise  wrote:
>
>> Hi Ayush,
>>
>> DeserializationSchema.isEndOfStream was only ever supported by Kafka. For
>> new Kafka source, the recommended way is to use the bounded mode like this
>>
>> KafkaSource source =
>> KafkaSource.builder()
>> ...
>> .setStartingOffsets(OffsetsInitializer.earliest())
>> .setBounded(OffsetsInitializer.latest())
>> .build();
>>
>> You can implement your own OffsetsInitializer or use a provided one.
>>
>> On Wed, Dec 8, 2021 at 9:19 AM Hang Ruan  wrote:
>>
>>> There is no way to end the kafka stream from the deserializer.
>>>
>>> When would you want to end the stream? Could you explain why you need to
>>> end the kafka stream without using the offset?
>>>
>>> Ayush Chauhan  于2021年12月8日周三 15:29写道:
>>>

 https://github.com/apache/flink/blob/release-1.13.1/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L69



 On Wed, Dec 8, 2021 at 12:40 PM Robert Metzger 
 wrote:

> Hi Ayush,
>
> I couldn't find the documentation you've mentioned. Can you send me a
> link to it?
>
> On Tue, Dec 7, 2021 at 10:40 AM Ayush Chauhan <
> ayush.chau...@zomato.com> wrote:
>
>> Hi,
>>
>> Can you please let me know the alternatives of isEndOfStream() as now
>> according to docs this method will no longer be used to determine the end
>> of the stream.
>>
>> --
>>  Ayush Chauhan
>>  Data Platform
>>  [image: mobile-icon]  +91 9990747111
>>
>>
>> This email is intended only for the person or the entity to whom it
>> is addressed. If you are not the intended recipient, please delete this
>> email and contact the sender.
>>
>

 --
  Ayush Chauhan
  Data Platform
  [image: mobile-icon]  +91 9990747111


 This email is intended only for the person or the entity to whom it is
 addressed. If you are not the intended recipient, please delete this email
 and contact the sender.

>>>
>
> --
>  Ayush Chauhan
>  Data Platform
>  [image: mobile-icon]  +91 9990747111
>
>
> This email is intended only for the person or the entity to whom it is
> addressed. If you are not the intended recipient, please delete this email
> and contact the sender.
>


Re: Antlr usage in FLink

2021-12-16 Thread Caizhi Weng
Hi!

Are you talking about the java code splitter module?

Flink will change user SQL into several Java classes and run them in
distributed JVMs. However due to some constraints in JVM, a single Java
method can't be longer than 64KB (in byte code). This constraint is easily
exceeded with a very complex SQL.

To solve this problem we introduce the java code splitter module in Flink
1.14 to split a long Java method into several shorter methods to work
around this constraint.

Krzysztof Chmielewski  于2021年12月17日周五
06:22写道:

> Hi,
> I was cruising through Flink's source code and I have noticed that one of
> the modules contains a lexer and parser g4 files for Java.
>
> I'm fairly familiar with Antlr4 btw. and I was wondering for what Flink
> uses Antlr4 with Java g4 files.
>
> Regards,
> Krzysztof Chmielewski
>


Re: Unified Source Interface in flink 1.12

2021-12-16 Thread Caizhi Weng
Hi!

It is possible for Flink 1.12. A major example is the Hive source [1].

[1] https://issues.apache.org/jira/browse/FLINK-19888

Krzysztof Chmielewski  于2021年12月17日周五
06:56写道:

> Hi,
> I know that FLIP-27 [1] was released in version 1.12 and I know that
> currently (version 1.14) we can easily use a custom source connector that
> implements new unified source interface as a corner stone for Table Source
> Connector in SQL Api.
>
> My question is, does version 1.12 also allows for using Source interface
> implementation for Table Source as it is for version 1.14 or it was added
> in post 1.12 versions?
>
> After my very quick research based on FileSystemTable source it seems it
> is possible in version 1.12. Please correct me if I'm wrong.
>
> Regards,
> Krzysztof Chmielewski
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>


Re: 双流窗口内join用flink sql实现的语法是什么?

2021-12-16 Thread Caizhi Weng
Hi!

从 Flink 1.14 开始,Flink SQL 支持 window join [1]。

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-join/

casel.chen  于2021年12月17日周五 08:47写道:

> 每隔5分钟join来自两条流的数据,用flink sql实现的写法是什么?
> 需要先join再窗口计算还是可以直接窗口内join? flink版本是1.13


双流窗口内join用flink sql实现的语法是什么?

2021-12-16 Thread casel.chen
每隔5分钟join来自两条流的数据,用flink sql实现的写法是什么?
需要先join再窗口计算还是可以直接窗口内join? flink版本是1.13

Unified Source Interface in flink 1.12

2021-12-16 Thread Krzysztof Chmielewski
Hi,
I know that FLIP-27 [1] was released in version 1.12 and I know that
currently (version 1.14) we can easily use a custom source connector that
implements new unified source interface as a corner stone for Table Source
Connector in SQL Api.

My question is, does version 1.12 also allows for using Source interface
implementation for Table Source as it is for version 1.14 or it was added
in post 1.12 versions?

After my very quick research based on FileSystemTable source it seems it is
possible in version 1.12. Please correct me if I'm wrong.

Regards,
Krzysztof Chmielewski


[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface


Antlr usage in FLink

2021-12-16 Thread Krzysztof Chmielewski
Hi,
I was cruising through Flink's source code and I have noticed that one of
the modules contains a lexer and parser g4 files for Java.

I'm fairly familiar with Antlr4 btw. and I was wondering for what Flink
uses Antlr4 with Java g4 files.

Regards,
Krzysztof Chmielewski


Flink fails to load class from configured classpath using PipelineOptions

2021-12-16 Thread Pouria Pirzadeh
I am developing a Java application which uses UDFs on Flink 1.14.
It uses PipelineOptions.JARS config to add jar files, containing UDF
classes, dynamically to the user classpath in the main method; However the
application fails to load UDF class from configured jar files at job launch
time with and crashes with ClassNotFoundException.

Is PipelineOptions.JARS the correct option to add files to classpath on Job
manager and all task managers?

Sample code snippet:

final Configuration configuration = new Configuration();
configuration.set(PipelineOptions.JARS,Collections.singletonList("file:///path/to/udf.jar"));
StreamExecutionEnvironment streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv);
...
Class udfClass = Class.forName("demo.MyUDF", ...);
tableEnv.createTemporarySystemFunction("MyUDF", udfClass);
...

Error stack trace:
Exception in thread "main" java.lang.ClassNotFoundException: demo.MyUDF
at
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:582)
at
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at
java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1886)
at
java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1772)
at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1594)
at
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
at
org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:692)
at
org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:714)
at
org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:130)
at
org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCallWithDataType(BridgingFunctionGenUtil.scala:116)
at
org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:73)
at
org.apache.flink.table.planner.codegen.calls.BridgingSqlFunctionCallGen.generate(BridgingSqlFunctionCallGen.scala:81)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:825)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:503)
at
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:58)org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:437)
at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:432)
at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:356)
...


Re: [EXTERNAL] Re: Periodic Job Failure

2021-12-16 Thread Julian Cardarelli
Worth a shot - might explain the variability thanks for sending this over!

Get Outlook for iOS


___
Julian Cardarelli
CEO
T (800) 961-1549
ejul...@thentia.com
LinkedIn
DISCLAIMER
​
​Neither Thentia Corporation, nor its directors, officers, shareholders, 
representatives, employees, non-arms length companies, subsidiaries, parent, 
affiliated brands and/or agencies are licensed to provide legal advice. This 
e-mail may contain among other things legal information. We disclaim any and 
all responsibility for the content of this e-mail. YOU MUST NOT rely on any of 
our communications as legal advice. Only a licensed legal professional may give 
you advice. Our communications are never provided as legal advice, because we 
are not licensed to provide legal advice nor do we possess the knowledge, 
skills or capacity to provide legal advice. We disclaim any and all 
responsibility related to any action you might take based upon our 
communications and emphasize the need for you to never rely on our 
communications as the basis of any claim or proceeding.
CONFIDENTIALITY
​
​This email and any files transmitted with it are confidential and intended 
solely for the use of the individual or entity to whom they are addressed. If 
you have received this email in error please notify the system manager. This 
message contains confidential information and is intended only for the 
individual(s) named. If you are not the named addressee(s) you should not 
disseminate, distribute or copy this e-mail. Please notify the sender 
immediately by e-mail if you have received this e-mail by mistake and delete 
this e-mail from your system. If you are not the intended recipient you are 
notified that disclosing, copying, distributing or taking any action in 
reliance on the contents of this information is strictly prohibited.
From: Dario Heinisch 
Sent: Thursday, December 16, 2021 1:18:14 PM
To: user@flink.apache.org 
Subject: Re: [EXTERNAL] Re: Periodic Job Failure


A shot in the dark but could it be this: 
https://mux.com/blog/5-years-of-flink-at-mux/
 ?

> The JVM will cache DNS entries forever by default. This is undesirable in 
> Kubernetes deployments where there’s an expectation that DNS entries can and 
> do change frequently as pod deployments move between nodes. We’ve seen Flink 
> applications suddenly unable to talk to other services in the cluster after 
> pods are upgraded.

Best regard,

Dario

On 16.12.21 19:09, Julian Cardarelli wrote:
So it connects to http rest based micro services and they are outside a 
Kubernetes HA setup for flink. All of a sudden  and it’s arbitrary not 
consistent it could be 10 days it could be 28 days, the calls stop going out on 
this one job but not others.

Recycling it brings it back. But the job and state all appear intact at the 
time of the cessation with the job in running state and no discernible 
exceptions.

I suppose it could be something in the network layer but because other jobs 
aren’t impacted I feel something else must be going on.

But the code throws nothing during this time period.

Is there any instrumentation we should be enabling to find out more detail? 
It’s a bit troublesome to reproduce so want to load all that in for next time 
it happens


Get Outlook for 
iOS
___​
Julian  Cardarelli
CEO
T
(800) 961-1549
E
jul...@thentia.com
LinkedIn
[Thentia 

Re: [EXTERNAL] Re: Periodic Job Failure

2021-12-16 Thread Dario Heinisch
A shot in the dark but could it be this: 
https://mux.com/blog/5-years-of-flink-at-mux/ ?


> The JVM will cache DNS entries forever by default. This is 
undesirable in Kubernetes deployments where there’s an expectation that 
DNS entries can and do change frequently as pod deployments move between 
nodes. We’ve seen Flink applications suddenly unable to talk to other 
services in the cluster after pods are upgraded.


Best regard,

Dario

On 16.12.21 19:09, Julian Cardarelli wrote:
So it connects to http rest based micro services and they are outside 
a Kubernetes HA setup for flink. All of a sudden  and it’s arbitrary 
not consistent it could be 10 days it could be 28 days, the calls stop 
going out on this one job but not others.


Recycling it brings it back. But the job and state all appear intact 
at the time of the cessation with the job in running state and no 
discernible exceptions.


I suppose it could be something in the network layer but because other 
jobs aren’t impacted I feel something else must be going on.


But the code throws nothing during this time period.

Is there any instrumentation we should be enabling to find out more 
detail? It’s a bit troublesome to reproduce so want to load all that 
in for next time it happens



Get Outlook for iOS 
___​
Julian  Cardarelli

CEO

T

*(800) 961-1549* 
E

*jul...@thentia.com* 

*LinkedIn* 

Thentia Website 
 



DISCLAIMER
​
​Neither Thentia Corporation, nor its directors, officers, 
shareholders, representatives, employees, non-arms length companies, 
subsidiaries, parent, affiliated brands and/or agencies are licensed 
to provide legal advice. This e-mail may contain among other things 
legal information. We disclaim any and all responsibility for the 
content of this e-mail. YOU MUST NOT rely on any of our communications 
as legal advice. Only a licensed legal professional may give you 
advice. Our communications are never provided as legal advice, because 
we are not licensed to provide legal advice nor do we possess the 
knowledge, skills or capacity to provide legal advice. We disclaim any 
and all responsibility related to any action you might take based upon 
our communications and emphasize the need for you to never rely on our 
communications as the basis of any claim or proceeding.


CONFIDENTIALITY
​
​This email and any files transmitted with it are confidential and 
intended solely for the use of the individual or entity to whom they 
are addressed. If you have received this email in error please notify 
the system manager. This message contains confidential information and 
is intended only for the individual(s) named. If you are not the named 
addressee(s) you should not disseminate, distribute or copy this 
e-mail. Please notify the sender immediately by e-mail if you have 
received this e-mail by mistake and delete this e-mail from your 
system. If you are not the intended recipient you are notified that 
disclosing, copying, distributing or taking any action in reliance on 
the contents of this information is strictly prohibited.



*From:* Chesnay Schepler 
*Sent:* Wednesday, December 15, 2021 10:09:32 AM
*To:* Julian Cardarelli ; user@flink.apache.org 


*Subject:* [EXTERNAL] Re: Periodic Job Failure
How are you deploying the job and the external services? Is the period 
in which this happens usually the same?
Is it just a connection issue with external services, or are there 
other errors as well?


On 15/12/2021 15:47, Julian Cardarelli wrote:


Hello –

We have a job that seems to stop working after some period of time – 
perhaps 10-12 days. The job itself appears in the running state, but 
for some reason it just stops communicating to external services.


I know this e-mail will be like “we don’t know what’s wrong with your 
code.” I get that part, but if we cancel the job and resubmit, 
everything flows again.


There doesn’t seem to be a clear answer on this and there is nothing 
in the stack trace.


So, my question is what’s the best practice for troubleshooting 
unexplained job malfunction over a prolonged period of time?


Thanks!

-jc

___​
Julian  Cardarelli

CEO

T

*(800) 961-1549* 
E

*jul...@thentia.com* 

*LinkedIn* 



Thentia Website 

Re: [EXTERNAL] Re: Periodic Job Failure

2021-12-16 Thread Julian Cardarelli
So it connects to http rest based micro services and they are outside a 
Kubernetes HA setup for flink. All of a sudden  and it’s arbitrary not 
consistent it could be 10 days it could be 28 days, the calls stop going out on 
this one job but not others.

Recycling it brings it back. But the job and state all appear intact at the 
time of the cessation with the job in running state and no discernible 
exceptions.

I suppose it could be something in the network layer but because other jobs 
aren’t impacted I feel something else must be going on.

But the code throws nothing during this time period.

Is there any instrumentation we should be enabling to find out more detail? 
It’s a bit troublesome to reproduce so want to load all that in for next time 
it happens


Get Outlook for iOS


___
Julian Cardarelli
CEO
T (800) 961-1549
ejul...@thentia.com
LinkedIn
DISCLAIMER
​
​Neither Thentia Corporation, nor its directors, officers, shareholders, 
representatives, employees, non-arms length companies, subsidiaries, parent, 
affiliated brands and/or agencies are licensed to provide legal advice. This 
e-mail may contain among other things legal information. We disclaim any and 
all responsibility for the content of this e-mail. YOU MUST NOT rely on any of 
our communications as legal advice. Only a licensed legal professional may give 
you advice. Our communications are never provided as legal advice, because we 
are not licensed to provide legal advice nor do we possess the knowledge, 
skills or capacity to provide legal advice. We disclaim any and all 
responsibility related to any action you might take based upon our 
communications and emphasize the need for you to never rely on our 
communications as the basis of any claim or proceeding.
CONFIDENTIALITY
​
​This email and any files transmitted with it are confidential and intended 
solely for the use of the individual or entity to whom they are addressed. If 
you have received this email in error please notify the system manager. This 
message contains confidential information and is intended only for the 
individual(s) named. If you are not the named addressee(s) you should not 
disseminate, distribute or copy this e-mail. Please notify the sender 
immediately by e-mail if you have received this e-mail by mistake and delete 
this e-mail from your system. If you are not the intended recipient you are 
notified that disclosing, copying, distributing or taking any action in 
reliance on the contents of this information is strictly prohibited.
From: Chesnay Schepler 
Sent: Wednesday, December 15, 2021 10:09:32 AM
To: Julian Cardarelli ; user@flink.apache.org 

Subject: [EXTERNAL] Re: Periodic Job Failure

How are you deploying the job and the external services? Is the period in which 
this happens usually the same?
Is it just a connection issue with external services, or are there other errors 
as well?

On 15/12/2021 15:47, Julian Cardarelli wrote:

Hello –



We have a job that seems to stop working after some period of time – perhaps 
10-12 days. The job itself appears in the running state, but for some reason it 
just stops communicating to external services.



I know this e-mail will be like “we don’t know what’s wrong with your code.” I 
get that part, but if we cancel the job and resubmit, everything flows again.



There doesn’t seem to be a clear answer on this and there is nothing in the 
stack trace.



So, my question is what’s the best practice for troubleshooting unexplained job 
malfunction over a prolonged period of time?



Thanks!

-jc



___​
Julian  Cardarelli
CEO
T
(800) 961-1549
E
jul...@thentia.com
LinkedIn
[Thentia 
Website]
DISCLAIMER
​
​Neither Thentia Corporation, nor its directors, officers, shareholders, 
representatives, employees, non-arms length companies, subsidiaries, parent, 
affiliated brands and/or agencies are licensed to provide legal advice. This 
e-mail may contain among other things legal information. We disclaim any and 
all responsibility for the content of this e-mail. YOU MUST NOT rely on any of 
our 

[ANNOUNCE] Apache Flink 1.14.2 / 1.13.5 / 1.12.7 / 1.11.6 released

2021-12-16 Thread Chesnay Schepler
The Apache Flink community has released emergency bugfix versions of 
Apache Flink for the 1.11, 1.12, 1.13 and 1.14 series.


These releases include a version upgrade for Log4j to address 
[CVE-2021-44228](https://nvd.nist.gov/vuln/detail/CVE-2021-44228) and 
[CVE-2021-45046](https://nvd.nist.gov/vuln/detail/CVE-2021-45046).


We highly recommend all users to upgrade to the respective patch release.

The releases are available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for further details:
https://flink.apache.org/news/2021/12/16/log4j-patch-releases.html


Regards,
Chesnay


Re: How do I determine which hardware device and software has log4j zero-day security vulnerability?

2021-12-16 Thread Arvid Heise
I think this is meant for the Apache log4j mailing list [1].

[1] https://logging.apache.org/log4j/2.x/mail-lists.html

On Thu, Dec 16, 2021 at 4:07 PM David Morávek  wrote:

> Hi Turritopsis,
>
> I fail to see any relation to Apache Flink. Can you please elaborate on
> how Flink fits into it?
>
> Best,
> D.
>
> On Thu, Dec 16, 2021 at 3:52 PM Turritopsis Dohrnii Teo En Ming <
> ceo.teo.en.m...@gmail.com> wrote:
>
>> Subject: How do I determine which hardware device and software has
>> log4j zero-day security vulnerability?
>>
>> Good day from Singapore,
>>
>> I am working for a Systems Integrator (SI) in Singapore. We have
>> several clients writing in, requesting us to identify log4j zero-day
>> security vulnerability in their corporate infrastructure.
>>
>> It seems to be pretty difficult to determine which hardware device and
>> which software has the vulnerability. There seems to be no lists of
>> hardware devices and software affected by the flaw any where on the
>> internet.
>>
>> Could you refer me to definitive documentation/guides on how to
>> identify log4j security flaw in hardware devices and software?
>>
>> Thank you very much for your kind assistance.
>>
>> Mr. Turritopsis Dohrnii Teo En Ming, 43 years old as of 16 Dec 2021,
>> is a TARGETED INDIVIDUAL living in Singapore. He is an IT Consultant
>> with a Systems Integrator (SI)/computer firm in Singapore. He is an IT
>> enthusiast.
>>
>>
>>
>>
>>
>> -BEGIN EMAIL SIGNATURE-
>>
>> The Gospel for all Targeted Individuals (TIs):
>>
>> [The New York Times] Microwave Weapons Are Prime Suspect in Ills of
>> U.S. Embassy Workers
>>
>> Link:
>>
>> https://www.nytimes.com/2018/09/01/science/sonic-attack-cuba-microwave.html
>>
>>
>> 
>>
>> Singaporean Targeted Individual Mr. Turritopsis Dohrnii Teo En Ming's
>> Academic Qualifications as at 14 Feb 2019 and refugee seeking attempts
>> at the United Nations Refugee Agency Bangkok (21 Mar 2017), in Taiwan
>> (5 Aug 2019) and Australia (25 Dec 2019 to 9 Jan 2020):
>>
>> [1] https://tdtemcerts.wordpress.com/
>>
>> [2] https://tdtemcerts.blogspot.sg/
>>
>> [3] https://www.scribd.com/user/270125049/Teo-En-Ming
>>
>> -END EMAIL SIGNATURE-
>>
>


Re: Information request: Reactive mode and Rescaling

2021-12-16 Thread Alexander Preuß
Hi Morgan,

Regarding your first question, if the Kafka connector is configured to use
exactly-once semantics it will check the offsets of partitions when
recovering from the checkpoint, so there will be no data loss or
duplication.
I'm not quite sure I understood the second part of the first question
regarding the HPA, are you asking if Flink is actively triggering a new
checkpoint before the pod shutdown? If so then the answer is no, it will
use the latest completed checkpoint.

For the second question, it will use the latest checkpoint, there is no
'active'/forced creation of a new checkpoint before the re-scaling.

You can find some more information about the mechanism here:
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/

Best regards,
Alexander

On Wed, Dec 15, 2021 at 10:24 PM Geldenhuys, Morgan Karl <
morgan.geldenh...@tu-berlin.de> wrote:

> Greetings,
>
>
> I would like to find out more about Flink's new reactive mode as well as
> the rescaling feature regarding fault tolerance. For the following question
> lets assume checkpointing is enabled using HDFS.
>
>
> So first question, if I have a job where the source(s) and sink(s) are
> configured to use Kafka with exactly-once processing enabled, how does
> reactive mode handle this? On reconfigurations using the Horizontal Pod
> Autoscaler, does it recover to the latest checkpoint?
>
>
> Second question, for re-scaling, does it automatically create a
> savepoint and then rescale or does it use the latest checkpoint to ensure
> results are consistent during reconfiguration.
>
>
> Thank you in advance!
>
>
> M.
>


-- 

Alexander Preuß | Junior Engineer - Data Intensive Systems

alexanderpre...@ververica.com




Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH

Registered at Amtsgericht Charlottenburg: HRB 158244 B

Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung Jason,
Jinwei (Kevin) Zhang


Re: How do I determine which hardware device and software has log4j zero-day security vulnerability?

2021-12-16 Thread David Morávek
Hi Turritopsis,

I fail to see any relation to Apache Flink. Can you please elaborate on how
Flink fits into it?

Best,
D.

On Thu, Dec 16, 2021 at 3:52 PM Turritopsis Dohrnii Teo En Ming <
ceo.teo.en.m...@gmail.com> wrote:

> Subject: How do I determine which hardware device and software has
> log4j zero-day security vulnerability?
>
> Good day from Singapore,
>
> I am working for a Systems Integrator (SI) in Singapore. We have
> several clients writing in, requesting us to identify log4j zero-day
> security vulnerability in their corporate infrastructure.
>
> It seems to be pretty difficult to determine which hardware device and
> which software has the vulnerability. There seems to be no lists of
> hardware devices and software affected by the flaw any where on the
> internet.
>
> Could you refer me to definitive documentation/guides on how to
> identify log4j security flaw in hardware devices and software?
>
> Thank you very much for your kind assistance.
>
> Mr. Turritopsis Dohrnii Teo En Ming, 43 years old as of 16 Dec 2021,
> is a TARGETED INDIVIDUAL living in Singapore. He is an IT Consultant
> with a Systems Integrator (SI)/computer firm in Singapore. He is an IT
> enthusiast.
>
>
>
>
>
> -BEGIN EMAIL SIGNATURE-
>
> The Gospel for all Targeted Individuals (TIs):
>
> [The New York Times] Microwave Weapons Are Prime Suspect in Ills of
> U.S. Embassy Workers
>
> Link:
> https://www.nytimes.com/2018/09/01/science/sonic-attack-cuba-microwave.html
>
>
> 
>
> Singaporean Targeted Individual Mr. Turritopsis Dohrnii Teo En Ming's
> Academic Qualifications as at 14 Feb 2019 and refugee seeking attempts
> at the United Nations Refugee Agency Bangkok (21 Mar 2017), in Taiwan
> (5 Aug 2019) and Australia (25 Dec 2019 to 9 Jan 2020):
>
> [1] https://tdtemcerts.wordpress.com/
>
> [2] https://tdtemcerts.blogspot.sg/
>
> [3] https://www.scribd.com/user/270125049/Teo-En-Ming
>
> -END EMAIL SIGNATURE-
>


How do I determine which hardware device and software has log4j zero-day security vulnerability?

2021-12-16 Thread Turritopsis Dohrnii Teo En Ming
Subject: How do I determine which hardware device and software has
log4j zero-day security vulnerability?

Good day from Singapore,

I am working for a Systems Integrator (SI) in Singapore. We have
several clients writing in, requesting us to identify log4j zero-day
security vulnerability in their corporate infrastructure.

It seems to be pretty difficult to determine which hardware device and
which software has the vulnerability. There seems to be no lists of
hardware devices and software affected by the flaw any where on the
internet.

Could you refer me to definitive documentation/guides on how to
identify log4j security flaw in hardware devices and software?

Thank you very much for your kind assistance.

Mr. Turritopsis Dohrnii Teo En Ming, 43 years old as of 16 Dec 2021,
is a TARGETED INDIVIDUAL living in Singapore. He is an IT Consultant
with a Systems Integrator (SI)/computer firm in Singapore. He is an IT
enthusiast.





-BEGIN EMAIL SIGNATURE-

The Gospel for all Targeted Individuals (TIs):

[The New York Times] Microwave Weapons Are Prime Suspect in Ills of
U.S. Embassy Workers

Link:
https://www.nytimes.com/2018/09/01/science/sonic-attack-cuba-microwave.html



Singaporean Targeted Individual Mr. Turritopsis Dohrnii Teo En Ming's
Academic Qualifications as at 14 Feb 2019 and refugee seeking attempts
at the United Nations Refugee Agency Bangkok (21 Mar 2017), in Taiwan
(5 Aug 2019) and Australia (25 Dec 2019 to 9 Jan 2020):

[1] https://tdtemcerts.wordpress.com/

[2] https://tdtemcerts.blogspot.sg/

[3] https://www.scribd.com/user/270125049/Teo-En-Ming

-END EMAIL SIGNATURE-


Re: pyFlink + asyncio

2021-12-16 Thread Alexander Preuß
Hi Михаил,

>From looking at
https://nightlies.apache.org/flink/flink-docs-master/api/python//pyflink.datastream.html
there is currently no AsyncFunction / RichAsyncFunction implementation in
pyFlink, so you are bound to synchronously interacting.

Best regards,
Alexander

On Thu, Dec 16, 2021 at 12:47 PM Королькевич Михаил 
wrote:

> Hi team!
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/
> Is it possible to use this for pyFlink?
> Or another asynchronous enrichment of an unordered data stream?
>


-- 

Alexander Preuß | Junior Engineer - Data Intensive Systems

alexanderpre...@ververica.com




Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH

Registered at Amtsgericht Charlottenburg: HRB 158244 B

Managing Directors: Karl Anton Wehner, Holger Temme, Yip Park Tung Jason,
Jinwei (Kevin) Zhang


Re: Confusion about rebalance bytes sent metric in Flink UI

2021-12-16 Thread Piotr Nowojski
Hi Tao,

Could you prepare a minimalistic example that would reproduce this issue?
Also what Flink version are you using?

Best,
Piotrek

czw., 16 gru 2021 o 09:44 tao xiao  napisał(a):

> >Your upstream is not inflating the record size?
> No, this is a simply dedup function
>
> On Thu, Dec 16, 2021 at 2:49 PM Arvid Heise  wrote:
>
>> Ah yes I see it now as well. Yes you are right, each record should be
>> replicated 9 times to send to one of the instances each. Your upstream is
>> not inflating the record size? The number of records seems to work
>> decently. @pnowojski  FYI.
>>
>> On Thu, Dec 16, 2021 at 2:20 AM tao xiao  wrote:
>>
>>> Hi Arvid
>>>
>>> The second picture shows the metrics of the upstream operator. The
>>> upstream has 150 parallelisms as you can see in the first picture. I expect
>>> the bytes sent is about 9 * bytes received as we have 9 downstream
>>> operators connecting.
>>>
>>> Hi Caizhi,
>>> Let me create a minimal reproducible DAG and update here
>>>
>>> On Thu, Dec 16, 2021 at 4:03 AM Arvid Heise  wrote:
>>>
 Hi,

 Could you please clarify which operator we see in the second picture?

 If you are showing the upstream operator, then this has only
 parallelism 1, so there shouldn't be multiple subtasks.
 If you are showing the downstream operator, then the metric would refer
 to the HASH and not REBALANCE.

 On Tue, Dec 14, 2021 at 2:55 AM Caizhi Weng 
 wrote:

> Hi!
>
> This doesn't seem to be the expected behavior. Rebalance shuffle
> should send records to one of the parallelism, not all.
>
> If possible could you please explain what your Flink job is doing and
> preferably share your user code so that others can look into this case?
>
> tao xiao  于2021年12月11日周六 01:11写道:
>
>> Hi team,
>>
>> I have one operator that is connected to another 9 downstream
>> operators using rebalance. Each operator has 150 parallelisms[1]. I 
>> assume
>> each message in the upstream operation is sent to one of the parallel
>> instances of the 9 receiving operators so the total bytes sent should be
>> roughly 9 times of bytes received in the upstream operator metric. 
>> However
>> the Flink UI shows the bytes sent is much higher than 9 times. It is 
>> about
>> 150 * 9 * bytes received[2]. This looks to me like every message is
>> duplicated to each parallel instance of all receiving operators like what
>> broadcast does.  Is this correct?
>>
>>
>>
>> [1] https://imgur.com/cGyb0QO
>> [2] https://imgur.com/SFqPiJA
>> --
>> Regards,
>> Tao
>>
>
>>>
>>> --
>>> Regards,
>>> Tao
>>>
>>
>
> --
> Regards,
> Tao
>


pyFlink + asyncio

2021-12-16 Thread Королькевич Михаил
Hi team!https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/Is it possible to use this for pyFlink?Or another asynchronous enrichment of an unordered data stream?


Re: CVE-2021-44228 - Log4j2 vulnerability

2021-12-16 Thread Chesnay Schepler

We will announce the releases when the binaries are available.

On 16/12/2021 05:37, Parag Somani wrote:

Thank you Chesnay for expediting this fix...!

Can you suggest, when can I get binaries for 1.14.2 flink version?

On Thu, Dec 16, 2021 at 5:52 AM Chesnay Schepler  
wrote:


We will push docker images for all new releases, yes.

On 16/12/2021 01:16, Michael Guterl wrote:

Will you all be pushing Docker images for the 1.11.6 release?

On Wed, Dec 15, 2021 at 3:26 AM Chesnay Schepler
 wrote:

The current ETA is 40h for an official announcement.
We are validating the release today (concludes in 16h),
publish it tonight, then wait for mirrors to be sync (about a
day), then we announce it.

On 15/12/2021 12:08, V N, Suchithra (Nokia - IN/Bangalore) wrote:


Hello,

Could you please tell when we can expect Flink 1.12.7
release? We are waiting for the CVE fix.

Regards,

Suchithra

*From:*Chesnay Schepler 

*Sent:* Wednesday, December 15, 2021 4:04 PM
*To:* Richard Deurwaarder 

*Cc:* user 

*Subject:* Re: CVE-2021-44228 - Log4j2 vulnerability

We will also update the docker images.

On 15/12/2021 11:29, Richard Deurwaarder wrote:

Thanks for picking this up quickly!

I saw you've made a second minor upgrade to upgrade to
log4j2 2.16 which is perfect.

Just to clarify: Will you also push new docker images
for these releases as well? In particular flink 1.11.6
(Sorry we must upgrade soon! :()

On Tue, Dec 14, 2021 at 2:33 AM narasimha
 wrote:

Thanks TImo, that was helpful.

On Mon, Dec 13, 2021 at 7:19 PM Prasanna kumar
 wrote:

Chesnay Thank you for the clarification.

On Mon, Dec 13, 2021 at 6:55 PM Chesnay Schepler
 wrote:

The flink-shaded-zookeeper jars do not
contain log4j.

On 13/12/2021 14:11, Prasanna kumar wrote:

Does Zookeeper have this vulnerability
dependency ? I see references to log4j
in Shaded Zookeeper jar included as part
of the flink distribution.

On Mon, Dec 13, 2021 at 1:40 PM Timo
Walther  wrote:

While we are working to upgrade the
affected dependencies of all
components, we recommend users
follow the advisory of the Apache Log4j
Community. Also Ververica platform
can be patched with a similar approach:

To configure the JVMs used by
Ververica Platform, you can pass custom
Java options via the
JAVA_TOOL_OPTIONS environment
variable. Add the
following to your platform
values.yaml, or append to the
existing value
of JAVA_TOOL_OPTIONS if you are
using it already there, then redeploy
the platform with Helm:
env:
   - name: JAVA_TOOL_OPTIONS
     value:
-Dlog4j2.formatMsgNoLookups=true


For any questions, please contact us
via our support portal.

Regards,
Timo

On 11.12.21 06:45, narasimha wrote:
> Folks, what about the veverica
platform. Is there any
mitigation around it?
>
> On Fri, Dec 10, 2021 at 3:32 PM
Chesnay Schepler  > wrote:
>
>     I would recommend to modify
your log4j configurations to set
>  log4j2.formatMsgNoLookups to true/./
>     /
>     /
>     As far as I can tell this is
equivalent to 

????

2021-12-16 Thread lorthevan


Re: using flink retract stream and rockdb, too many intermediateresult of values cause checkpoint too heavy to finish

2021-12-16 Thread vtygoss
Hi  Arvid Heise,


Thanks for your reply! It's not classical sensor aggregation.  


The reason for not using window join is the very long time gap between 
patient's behaviors. 


There is a long gap of days even months between the appointment of doctor and 
the visit, and between tests and between hospitalization and discharge. It's a 
little like a specail session window having a very long gap, but it won't be a 
time or number based window. 


> actual use case? 
The actual use cases are based on this scenario, like doctors, patients, 
orders, visits, tests, hospitalization, nursing notes and so on. 
> What do i want to acheive? 
As mentioned above, during a long time zone, dozens of events continue to 
arrive for each patients, especally testing and nursing records. I hope that 
when the new record comes, the old result will be updated automatically. And i 
also hope the delay of the retraction and the re-sendition can be within 10 
minutes. 
> consumers of the produced dataset?
Data developers will build a data streaming production pipeline based on 
upstream datasets and produce new datasets; Data analysts will analyse data and 
model like the relationship between spending cost and medical outcomes; Doctor 
and nurse on duty will query all info of corresponding patient.   


Thanks for your any reply or suggestion. 


Best Regards!
2021-12-16 17:25:00


在 2021年12月16日 04:09,Arvid Heise 写道:


Can you please describe your actual use case? What do you want to achieve 
low-latency or high-throughput? What are the consumers of the produced dataset?



It sounds to me as if this is classical sensor aggregation. I have not heard of 
any sensor aggregation that doesn't use windowing. So you'd usually include a 
TUMBLE window of 10s and output the data in small batches. This would 
significantly reduce the pressure on the sink and may already solve some of 
your problems.



On Tue, Dec 14, 2021 at 4:29 AM Caizhi Weng  wrote:

Hi!


Changes of input tables will cause corresponding changes in output table


Which sink are you using? If it is an upsert sink then Flink SQL planner will 
filter out UPDATE_BEFORE messages automatically. Also if your sink supports 
something like "ignore delete messages" it can also filter out delete messages 
and affect the downstream less.


Mini-batch will also help in this case. If mini-batch is enabled, aggregations 
will only send updates to the downstream once per batch, thus decreasing the 
number of records flowing to downstream.


For better performance on aggregations you can also try local-global 
aggregations. See [1] for details.


Row-Based Storage


This depends on the format you use. Although Flink's current calculation model 
is row-based, it still supports column-based format like parquet and has a 
number of optimizations on it. If you enable mini-batch and two-staged 
aggregations most job will meet their performance needs.


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#local-global-aggregation


vtygoss  于2021年12月13日周一 17:13写道:

Hi, community!


I meet a problem in the procedure of building a streaming production pipeline 
using Flink retract stream and hudi-hdfs/kafka as storage engine and rocksdb as 
statebackend. 


In my scenario, 
- During a patient's hospitalization, multiple measurements of vital signs are 
recorded, including temperature, pulse, blood pressure and so on. 
- Each type of vital sign contains 20+ or more records with PRIMARY 
KEY(patientId, visitId, signType, time) in table tbl_vis_vital_signs mentioned 
in below code. 


And, i need to get all the vital sign records aggregations together through 
JOIN or COLLECT with FILTER, as code below. 


```
select pid, vid, 
collect(ROW(..., temperature,...)) filter(where signType='temprature') as 
temprature,
collect(ROW(..., pulse,..))filter(where signType='pulse') as pulse,
collect() filter(where ...) as bloodpressure
from tbl_vis_vital_signs 
group by pid, vid
```


With the help of FlinkCDC and Kafka/Hudi-Hdfs, we want to build streaming 
production pipeline, as the data flow below. 


DataBase--[CDC tools]-->   Kafka --[sync]--> Dynamic 
Table(kafka/hudi-hdfs)  --Flink SQL(retract stream) --> Dynamic Table 


The problem is contributed by three factors as following. 
1. Data Inflations:
1) major: Changes of input tables will cause corresponding changes in output 
table, e.g. join, aggregation. In the code above, every change of each row in 
tbl_vis_vital_signs will retract the out-dated result full of all vital signs' 
info and send new result. More serious, there are many vital sign records 
during per hospitalization, and cause too many times of retract and re-send 
operations which will be consumed by all downstreams.
2) minor: Each cdc update event will be split in two event: deletion of old 
record and insertion of new record. 
2. Kafka / Hudi-HDFS / RocksDB Append incremental data to full data: 
1) RocksDB and Hudi-HDFS use 

Re: 回撤流优化

2021-12-16 Thread Jingsong Li
理论上mini-batch就可以优化回撤流。

目前是join没有支持mini-batch。

On Thu, Dec 16, 2021 at 5:12 PM casel.chen  wrote:
>
> 看了《Oceanus的实时流式计算实践与优化》https://jishuin.proginn.com/p/763bfbd5acbf 
> 想问一下社区是否有意实现这里说的回撤流优化功能呢?
> 实际业务很多数据是从mysql binlog cdc接入的,在回撤流上做计算是常见的场景,能否在flink sql中支持这些优化呢?



-- 
Best, Jingsong Lee


回撤流优化

2021-12-16 Thread casel.chen
看了《Oceanus的实时流式计算实践与优化》https://jishuin.proginn.com/p/763bfbd5acbf 
想问一下社区是否有意实现这里说的回撤流优化功能呢?
实际业务很多数据是从mysql binlog cdc接入的,在回撤流上做计算是常见的场景,能否在flink sql中支持这些优化呢?

Svar: WindowOperator TestHarness

2021-12-16 Thread Pierre Bedoucha
Hi Timo,

And thank you for the detailed answer.
We chose to go for the second alternative using the following:

import org.apache.flink.streaming.api.transformations.OneInputTransformation
import 
org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
val env = StreamExecutionEnvironment.getExecutionEnvironment

val source = env.fromElements(new MyInputType(..., eventTime = 
Some(Timestamp(1595447118L))),
 new MyInputType(..., eventTime = Some(Timestamp(1595447119L

val window1 : DataStream[MyOutputType] = source.keyBy[(String, String)](
 (v: MyInputType) => (
   v.a, v.b,
 )
)
 .window(SlidingEventTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, 
TimeUnit.MILLISECONDS)))
 .aggregate(new MyAggregator())

val transform : OneInputTransformation[MyInputType, MyOutputType] = 
window1.getTransformation

val operator = transform.getOperator


However the *.getTransformation* method seems to not be exposed for the 
windowed and aggregated DataStream. We´re using Flink 1.13.2 so far, could it 
be due to public test API exposition?

Kind regards,
Pierre and Lars


Fra: Timo Walther 
Dato: mandag, 13. desember 2021 kl. 08:53
Til: user@flink.apache.org 
Emne: Re: WindowOperator TestHarness
Hi Lars,

you can take a look at how
org.apache.flink.streaming.api.datastream.WindowedStream#WindowedStream
constructs the graph under the hood. In particular, it uses
org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorBuilder
which constructs the InternalWindowFunction you are looking for.

You could also think about using regular DataStream API to construct the
operator. And access it for the test harness via something like
dataStreamn.getTransformation().getOperator(). This avoid calling too
many of the internal classes.

I hope this helps.

Timo


On 10.12.21 15:46, Lars Skjærven wrote:
> Hello,
>
> We're trying to write a test for an implementation of
> *AggregateFunction* following a *EventTimeSessionWindows.withGap*. We
> gave it a try using *WindowOperator*() which we hoped could be used as
> an argument to *KeyedOneInputStreamOperatorTestHarness*. We're a bit
> stuck, and we're hoping someone has a tip or two. Specifically, we can't
> find the right *InternalWindowFunction* to pass to WindowOperator().
> Below, *MyAggregator* is our implementation of the *AggregateFunction.
> *
> *
> *
> Does anyone have a template, or guide, to test a windowed aggregate
> function?*
> *
> *
> *
> Kind regards,
> Lars
>
>
>  val myWindowOperator = new WindowOperator(
>EventTimeSessionWindows.withGap(Time.seconds(10)),
>new TimeWindow.Serializer(),
>new KeySelector[MyInputType, (String, String)] {
>  override def getKey(value: MyInputType): (String, String) = {
>(value.a, value.b)
>  }
>},
>Types.TUPLE(Types.STRING).createSerializer(
>  new ExecutionConfig
>),
>new AggregatingStateDescriptor[MyInputType, MyAggregateState,
> MyOutputType](
>  "test", new MyAggregator, classOf[MyAggregateState],
>),
>???,
>EventTimeTrigger.create(),
>0,
>null
>  )
>
>  testHarness = new KeyedOneInputStreamOperatorTestHarness[(String,
> String), MyInputType, MyOutputType](
> myWindowOperator,
>new KeySelector[MyInputType, (String, String)] {
>  override def getKey(value: MyInputType): (String, String) = {
>(value.a, value.b)
>  }
>},
>createTuple2TypeInformation(Types.STRING, Types.STRING)
>  )
>
>
>


Re: Confusion about rebalance bytes sent metric in Flink UI

2021-12-16 Thread tao xiao
>Your upstream is not inflating the record size?
No, this is a simply dedup function

On Thu, Dec 16, 2021 at 2:49 PM Arvid Heise  wrote:

> Ah yes I see it now as well. Yes you are right, each record should be
> replicated 9 times to send to one of the instances each. Your upstream is
> not inflating the record size? The number of records seems to work
> decently. @pnowojski  FYI.
>
> On Thu, Dec 16, 2021 at 2:20 AM tao xiao  wrote:
>
>> Hi Arvid
>>
>> The second picture shows the metrics of the upstream operator. The
>> upstream has 150 parallelisms as you can see in the first picture. I expect
>> the bytes sent is about 9 * bytes received as we have 9 downstream
>> operators connecting.
>>
>> Hi Caizhi,
>> Let me create a minimal reproducible DAG and update here
>>
>> On Thu, Dec 16, 2021 at 4:03 AM Arvid Heise  wrote:
>>
>>> Hi,
>>>
>>> Could you please clarify which operator we see in the second picture?
>>>
>>> If you are showing the upstream operator, then this has only parallelism
>>> 1, so there shouldn't be multiple subtasks.
>>> If you are showing the downstream operator, then the metric would refer
>>> to the HASH and not REBALANCE.
>>>
>>> On Tue, Dec 14, 2021 at 2:55 AM Caizhi Weng 
>>> wrote:
>>>
 Hi!

 This doesn't seem to be the expected behavior. Rebalance shuffle should
 send records to one of the parallelism, not all.

 If possible could you please explain what your Flink job is doing and
 preferably share your user code so that others can look into this case?

 tao xiao  于2021年12月11日周六 01:11写道:

> Hi team,
>
> I have one operator that is connected to another 9 downstream
> operators using rebalance. Each operator has 150 parallelisms[1]. I assume
> each message in the upstream operation is sent to one of the parallel
> instances of the 9 receiving operators so the total bytes sent should be
> roughly 9 times of bytes received in the upstream operator metric. However
> the Flink UI shows the bytes sent is much higher than 9 times. It is about
> 150 * 9 * bytes received[2]. This looks to me like every message is
> duplicated to each parallel instance of all receiving operators like what
> broadcast does.  Is this correct?
>
>
>
> [1] https://imgur.com/cGyb0QO
> [2] https://imgur.com/SFqPiJA
> --
> Regards,
> Tao
>

>>
>> --
>> Regards,
>> Tao
>>
>

-- 
Regards,
Tao


关于任务失败原因日志问题

2021-12-16 Thread yidan zhao
如题,我目前生产中经常遇到,任务失败,cancel过程导致TM失败,进而其他任务都失败的这种。

我现在比较麻烦的是,我无法判定具体是外界因素比如网络等导致TM失败,进而导致任务失败。还是先任务由于某种原因失败,然后restart过程导致TM失败。

目前是看每台机器的TM日志,不太一样。
有的TM第一个异常日志是:Attempting to cancel task, , Triggering cancellation of
task code...
有的TM第一个异常日志是: (40/60)#0 (5e91a8139f7858005f4c06bb1b6e9ca6) switched
from RUNNING to FAILED with failure cause:
org.apache.flink.runtime.io.network.netty.exception.RemoteTran
sportException: Error at remote task manager '10.xx.94.150/10.35.94.150:136
'.
还有一个不太一样,如下:
2021-12-16 15:26:33,189 INFO
 
org.apache.flink.shaded.curator4.org.apache.curator.framework.state.ConnectionStateManager
[] - State change: SUSPENDED
2021-12-16 15:26:33,190 WARN
 org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver []
- Connection to ZooKeeper suspended. Can no longer re
trieve the leader from ZooKeeper.
2021-12-16 15:26:33,190 INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor   [] -
JobManager for job 5ae97a7c2319a277520f8dc92d311347 with leade
r id a4faf21926590158b40b372347a746a9 lost leadership.
2021-12-16 15:26:33,191 WARN
 org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver []
- Connection to ZooKeeper suspended. Can no longer re
trieve the leader from ZooKeeper.
2021-12-16 15:26:33,191 INFO
 org.apache.flink.runtime.taskexecutor.TaskExecutor   [] - Close
JobManager connection for job 5ae97a7c2319a277520f8dc92d311347.
2021-12-16 15:26:33,191 INFO  org.apache.flink.runtime.taskmanager.Task
   [] - Attempting to fail task externally
ip_gap_g4_SidIncludeFilter(39/40)#0 (619938d9cfa52431265bebc836bceafb).

如上3个TM的日志,是否可以确认是第3个为根本原因?


unexpected result when using sql interval join ability

2021-12-16 Thread cy
Hi
Flink 1.14.0 Scala 2.12


I'm using flink sql interval join ability, here is my table schema and sql


create table `queue_3_ads_ccops_perf_o_ebs_volume_capacity` ( `dtEventTime` 
timestamp(3), `dtEventTimeStamp` bigint, `sourceid` string, `cluster_name` 
string, `poolname` string, `storage_poolname` string, `usage` decimal(10, 4), 
`provisioned_size` decimal(10, 4), `startat` timestamp(3), `endat` 
timestamp(3), `vrespool_id` int, `uuid` string, `version` string, `localTime` 
timestamp(3), `cluster_id` int, `extend1` string, `extend2` string, `extend3` 
string, `mon_ip` string, `bussiness_ip` string, `datasource` string, `thedate` 
int, `name` string, `used_size` int, watermark for `startat` as `startat` - 
interval '60' minutes ) with ( 'connector' = 'kafka', 'topic' = 
'queue_3_ads_ccops_perf_o_ebs_volume_capacity', 'format' = 'json', 
'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = 
'???', 'properties.group.id' = 'layer-vdisk', 
'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' 
= 'SCRAM-SHA-512', 'properties.sasl.jaas.config' = 
'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule
 required username="" password="?";' );


SELECT 
source.sourceid AS sourceid, 
cast(source.startat AS timestamp) AS source_startat,
cast(target.startat AS timestamp) AS target_startat, 
source.used_size AS source_used_size, 
target.used_size AS target_used_size, 
source.usage AS source_usage,
target.usage AS target_usage 
FROM queue_3_ads_ccops_perf_o_ebs_volume_capacity source, 
queue_3_ads_ccops_perf_o_ebs_volume_capacity target
WHERE source.sourceid = target.sourceid
AND source.sourceid in (
'volume-9dfed0d9-28b2-418a-9215-ce762ef80920', 
'volume-9ece34f1-f4bb-475a-8e64-a2e37711b4fc', 
'volume-9f0ec4cc-5cc4-49a8-b715-a91a25df3793', 
'volume-9f38e0b3-2324-4505-a8ad-9b1ccb72181f', 
'volume-9f3ec256-10fb-4d8b-a8cb-8498324cf309'
)
AND source.startat >= FLOOR(target.startat TO HOUR) + INTERVAL '1' HOUR AND 
source.startat < FLOOR(target.startat TO HOUR) + INTERVAL '2' HOUR;


and result


I'm confused about first row that source_startat and target_startat was not 
matched the time condition. 
Also I try to execute the sql below


SELECT TO_TIMESTAMP('2021-12-13 14:05:06') >= FLOOR(TO_TIMESTAMP('2021-12-13 
12:05:08') TO HOUR) + INTERVAL '1' HOUR AND TO_TIMESTAMP('2021-12-13 14:05:06') 
< FLOOR(TO_TIMESTAMP('2021-12-13 12:05:08') TO HOUR) + INTERVAL '2' HOUR;


the result false is correct.


So is anything wrong with flink sql interval join?


Need your help, thank you.








 





 

Re: unaligned checkpoint for job with large start delay

2021-12-16 Thread Piotr Nowojski
Hi Mason,

In Flink 1.14 we have also changed the timeout behavior from checking
against the alignment duration, to simply checking how old is the
checkpoint barrier (so it would also account for the start delay) [1]. It
was done in order to solve problems as you are describing. Unfortunately we
can not backport this change to 1.13.x as it's a breaking change.

Anyway, from our experience I would recommend going all in with the
unaligned checkpoints, so setting the timeout back to the default value of
0ms. With timeouts you are gaining very little (a tiny bit smaller state
size if there is no backpressure - tiny bit because without backpressure,
even with timeout set to 0ms, the amount of captured inflight data is
basically insignificant), while in practise you slow down the checkpoint
barriers propagation time by quite a lot.

Best,
Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-23041

wt., 14 gru 2021 o 22:04 Mason Chen  napisał(a):

> Hi all,
>
> I'm using Flink 1.13 and my job is experiencing high start delay, more so
> than high alignment time. (our flip 27 kafka source is heavily
> backpressured). Since our alignment timeout is set to 1s, the unaligned
> checkpoint never triggers since alignment delay is always below the
> threshold.
>
> It's seems there is only a configuration for alignment timeout but should
> there also be one for start delay timeout:
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/#aligned-checkpoint-timeout
>
> I'm interested to know the reasoning why there isn't a timeout for start
> delay as well--was it because it was deemed too complex for the user to
> configure two parameters for unaligned checkpoints?
>
> I'm aware of buffer debloating in 1.14 that could help but I'm trying to
> see how far unaligned checkpointing can take me.
>
> Best,
> Mason
>


unexpected result of interval join when using sql

2021-12-16 Thread cy
Hi
Flink 1.14.0 Scala 2.12


I'm using flink sql interval join ability, here is my table schema and sql


create table `queue_3_ads_ccops_perf_o_ebs_volume_capacity` ( `dtEventTime` 
timestamp(3), `dtEventTimeStamp` bigint, `sourceid` string, `cluster_name` 
string, `poolname` string, `storage_poolname` string, `usage` decimal(10, 4), 
`provisioned_size` decimal(10, 4), `startat` timestamp(3), `endat` 
timestamp(3), `vrespool_id` int, `uuid` string, `version` string, `localTime` 
timestamp(3), `cluster_id` int, `extend1` string, `extend2` string, `extend3` 
string, `mon_ip` string, `bussiness_ip` string, `datasource` string, `thedate` 
int, `name` string, `used_size` int, watermark for `startat` as `startat` - 
interval '60' minutes ) with ( 'connector' = 'kafka', 'topic' = 
'queue_3_ads_ccops_perf_o_ebs_volume_capacity', 'format' = 'json', 
'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = 
'???', 'properties.group.id' = 'layer-vdisk', 
'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' 
= 'SCRAM-SHA-512', 'properties.sasl.jaas.config' = 
'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule
 required username="" password="?";' );


SELECT 
source.sourceid AS sourceid, 
cast(source.startat AS timestamp) AS source_startat,
cast(target.startat AS timestamp) AS target_startat, 
source.used_size AS source_used_size, 
target.used_size AS target_used_size, 
source.usage AS source_usage,
target.usage AS target_usage 
FROM queue_3_ads_ccops_perf_o_ebs_volume_capacity source, 
queue_3_ads_ccops_perf_o_ebs_volume_capacity target
WHERE source.sourceid = target.sourceid
AND source.sourceid in (
'volume-9dfed0d9-28b2-418a-9215-ce762ef80920', 
'volume-9ece34f1-f4bb-475a-8e64-a2e37711b4fc', 
'volume-9f0ec4cc-5cc4-49a8-b715-a91a25df3793', 
'volume-9f38e0b3-2324-4505-a8ad-9b1ccb72181f', 
'volume-9f3ec256-10fb-4d8b-a8cb-8498324cf309'
)
AND source.startat >= FLOOR(target.startat TO HOUR) + INTERVAL '1' HOUR AND 
source.startat < FLOOR(target.startat TO HOUR) + INTERVAL '2' HOUR;


and result


I'm confused about first row that source_startat and target_startat was not 
matched the time condition. 
Also I try to execute the sql below


SELECT TO_TIMESTAMP('2021-12-13 14:05:06') >= FLOOR(TO_TIMESTAMP('2021-12-13 
12:05:08') TO HOUR) + INTERVAL '1' HOUR AND TO_TIMESTAMP('2021-12-13 14:05:06') 
< FLOOR(TO_TIMESTAMP('2021-12-13 12:05:08') TO HOUR) + INTERVAL '2' HOUR;


the result false is correct.


So is anything wrong with flink sql interval join?


Need your help, thank you.