flink1.11??Streaming File Sink????

2021-02-22 Thread op

  flink1.11??Streaming File 
Sinkhdfsexactly-once

????

2021-02-22 Thread ??????


Re: Re: 大佬们, keyby()两次, 然后再window(), 会有几个窗口?

2021-02-22 Thread yidan zhao
而如果是连续keyBy,比如.keyBy(xx).keyBy(yy).window()这样keyBy多少此也只最后一个有效,window当然还是只有1个。不会出现多个window的。


yidan zhao  于2021年2月23日周二 下午3:31写道:

> 我突然感觉还是沟通问题。window的只有1个。因为你就写了一个window。
> 我指的是flatMap和window是分开的算子,不会是1个算子。
>
> hdxg1101300123  于2021年2月22日周一 下午11:37写道:
>
>>
>> 为什么flatmap就是2个
>>
>>
>> 发自vivo智能手机
>> > 不对,看了你描述没看代码。你代码那么写的化是2个哈。因为你keyBy后做了flatMap,再keyBy就是另外一个了哈。
>> >
>> > yidan zhao  于2021年2月22日周一 上午10:31写道:
>> >
>> > > 只有最后一个keyBy有效。
>> > >
>> > > Hongyuan Ma  于2021年2月21日周日 下午10:59写道:
>> > >
>> > >> 大佬们, 如果keyby两次然后再调用window()的话是只根据最后一次keyby的键生成n个窗口,
>> > >> 还是在前一次keyby的基础上生成m*n个窗口?
>> > >>
>> > >>
>> > >> 像下面这样写, 最后的窗口是只按area划分的吗?
>> > >> // 我想对不同车(id)的轨迹进行预测, 然后统计各个区域(area)内的轨迹信息
>> > >> stream.keyby("id")
>> > >> .flatmap() // 根据id 对轨迹进行预测, 在里面使用key state
>> > >> .assignTime() // 修改轨迹eventTime为预测出的时间
>> > >> .keyby("area")
>> > >> .window() // 根据区域划分窗口
>> > >> .process() // 统计各个区域内的轨迹
>> > >>
>> > >>
>>
>


Re: Re: 大佬们, keyby()两次, 然后再window(), 会有几个窗口?

2021-02-22 Thread yidan zhao
我突然感觉还是沟通问题。window的只有1个。因为你就写了一个window。
我指的是flatMap和window是分开的算子,不会是1个算子。

hdxg1101300123  于2021年2月22日周一 下午11:37写道:

>
> 为什么flatmap就是2个
>
>
> 发自vivo智能手机
> > 不对,看了你描述没看代码。你代码那么写的化是2个哈。因为你keyBy后做了flatMap,再keyBy就是另外一个了哈。
> >
> > yidan zhao  于2021年2月22日周一 上午10:31写道:
> >
> > > 只有最后一个keyBy有效。
> > >
> > > Hongyuan Ma  于2021年2月21日周日 下午10:59写道:
> > >
> > >> 大佬们, 如果keyby两次然后再调用window()的话是只根据最后一次keyby的键生成n个窗口,
> > >> 还是在前一次keyby的基础上生成m*n个窗口?
> > >>
> > >>
> > >> 像下面这样写, 最后的窗口是只按area划分的吗?
> > >> // 我想对不同车(id)的轨迹进行预测, 然后统计各个区域(area)内的轨迹信息
> > >> stream.keyby("id")
> > >> .flatmap() // 根据id 对轨迹进行预测, 在里面使用key state
> > >> .assignTime() // 修改轨迹eventTime为预测出的时间
> > >> .keyby("area")
> > >> .window() // 根据区域划分窗口
> > >> .process() // 统计各个区域内的轨迹
> > >>
> > >>
>


Re: Question

2021-02-22 Thread Matthias Pohl
Yes, Flink jobs are deployed using `./bin/flink run`. It will use the
configuration in conf/flink-conf.yaml to connect to the Flink cluster.

It looks like you don't have the right dependencies loaded onto your
classpath. Have you had a look at the documentation about project
configuration [1]? This gives you insight on how to set up the dependencies
for your Flink project. "Setting up a Project: Basic Dependencies" [2]
describes the basic requirements for the project dependencies. Maven
Quickstart [3] in contrast shows you how to initialize a Maven-based Flink
project.

Best,
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/project-configuration.html#project-configuration
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/project-configuration.html#setting-up-a-project-basic-dependencies
[3]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/project-configuration.html#maven-quickstart

On Mon, Feb 22, 2021 at 5:06 PM Abu Bakar Siddiqur Rahman Rocky <
bakar121...@gmail.com> wrote:

> Hi Matthias Pohl,
>
> Thank you for your reply.
>
> At first, I'm sorry if my question make you confuse. Let me know if it's
> unclear to you.
>
> (1) To run a code in Flink, we will have to use this command:
> ./bin/flink run /home/username/folder/code.jar
>
> Is it correct?
>
> (2) I run a code in eclipse, it gives the error according to attached pic.
> I guess, if (1) is correct, then I still can't run in the Flink due to the
> error of the code.
>
> The code cannot be resolved org.apache or checkpointing mode
>
>
> Thank you
>
> On Mon, Feb 22, 2021, 7:48 AM Matthias Pohl 
> wrote:
>
>> Hi,
>> running your job from within your IDE with no specific configuration
>> provided (like the Flink job examples provided by the Flink [1]) means that
>> you spin up a local Flink cluster (see MiniCluster [2]). This does not have
>> the web UI enabled by default. You could enable it by calling
>> `StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);`,
>> instead. Don't forget to add the `flink-runtime-web` dependency:
>> 
>>   org.apache.flink
>>   flink-runtime-web_${scala.binary.version}
>>   ${flink.version}
>> 
>>
>> Best,
>> Matthias
>>
>> [1] https://github.com/apache/flink/tree/master/flink-examples
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/runtime/minicluster/MiniCluster.html
>>
>> On Sat, Feb 20, 2021 at 4:07 AM Abu Bakar Siddiqur Rahman Rocky <
>> bakar121...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I read it:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/try-flink/local_installation.html
>>>
>>> I can run the code in the UI of Apache Flink that is in the bin file of
>>> Apache Flink. If I run a java code from intellij idea or eclipse, then how
>>> can I connect the code to apache flink UI?
>>>
>>> Thank you!
>>>
>>> On Fri, Feb 12, 2021 at 11:43 AM Matthias Pohl 
>>> wrote:
>>>
 Checkpoints are stored in some DFS storage. The location can be
 specified using state.checkpoints.dir configuration property [1]. You can
 access the state of a savepoint or checkpoint using the State Processor API
 [2].

 Best,
 Matthias

 [1]
 https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#directory-structure
 [2]
 https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

 On Fri, Feb 12, 2021 at 5:35 PM Abu Bakar Siddiqur Rahman Rocky <
 bakar121...@gmail.com> wrote:

> Thank you for your reply.
>
> Another Question:
> After Checkpointing, we save our snapshot to a storage. How can we
> access the storage?
>
> is this the source code:
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
>
> If it is not, could you please provide me the source code to access in
> the storage where snapshots are saved?
>
> Thank you
>
>
>
>
> On Fri, Feb 12, 2021 at 2:44 AM Matthias Pohl 
> wrote:
>
>> Hi Abu Bakar Siddiqur Rahman,
>> Have you had a look at the Flink documentation [1]? It provides
>> step-by-step instructions on how to run a job (the Flink binaries provide
>> example jobs under ./examples) using a local standalone cluster. This
>> should also work on a Mac. You would just need to start the Flink cluster
>> (./bin/start-cluster.sh) and submit a job using one of the example jars
>> provided in the binaries (e.g. ./bin/flink run -d
>> ./examples/streaming/WordCount.jar). You can check the job running in
>> Flink's web UI being available under http://localhost:8081 if you
>> use the default configuration provided by the Flink binaries.
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/try-flink/local_installation.html

通过普通ddl来读写hive

2021-02-22 Thread silence
问一下社区有没有计划支持普通的ddl(不用hive的catalog)来进行读写hive表吗
现在不支持是有什么考虑吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/

BroadcastState dropped when data deleted in Kafka

2021-02-22 Thread bat man
Hi,

I have 2 streams one event data and the other rules. I broadcast the rules
stream and then key the data stream on event type. The connected stream is
processed thereafter.
We faced an issue where the rules data in the topic got deleted because of
Kafka retention policy.
Post this the existing rules data also got dropped in the broadcast state
and the processing stopped.

As per my understanding the rules which were present in broadcast state
should still exist even if the data was deleted in Kafka as the rules dats
was already processed and stored in state map.

PS: I’m reusing the rules stream as broadcast later in processing as well.
Could this be an issue?

Thanks,
Hemant


大佬们, assignTimestampsAndWatermarks() 支持将eventTime设置为未来的时间吗

2021-02-22 Thread Hongyuan Ma
我想预测某个轨迹点后续5秒的轨迹, 并设置eventTime为未来的时间
我使用AscendingTimestampExtractor 但是报了 WARN Timestamp monotony violated xxx < 
yyy


// 对于每个轨迹点, 预测输出其后续10秒的点, 比如A车10秒时来了一条, B车15秒时来了一条
stream.flatmap()  // 预测出A车11~20秒的轨迹, B车16~25秒时的轨迹
.assignTimestamps(new AscendingTimestampExtractor()) // 设置eventTime为预测计算出的未来的时间
.window(1s) // 收集预测出的第16, 第17秒...第20秒时的A, B两个轨迹点
.process() // 对某一秒时的轨迹点进行两两距离计算, 距离过近就发送一条警报消息

Re: Configure operator based on key

2021-02-22 Thread Abhinav Sharma
Hi Yidan,

Thank you for your reply. I was wondering if there is some way that the
process function can kiw which condition fired the trigger.

Eg: If I set trigger to fire when he object associated with key have value
2, 8, 10 (3 conditions for the trigger to fire), then if he process
function, I want to operate differently on them.

On Mon, Feb 22, 2021, 11:23 AM yidan zhao  wrote:

> You can self-define it using keyedStream.window(GlobalWindows.create()
> ).trigger(self-defined-trigger).
>
> Abhinav Sharma  于2021年2月21日周日 下午3:57写道:
>
>> Hi,
>>
>> Is there some way that I can configure an operator based on the key in a
>> stream?
>> Eg: If the key is 'abcd', then create a window of size X counts, if the
>> key is 'bfgh', then create a window of size Y counts.
>>
>> Is this scenario possible in flink
>>
>>


Flink custom trigger use case

2021-02-22 Thread Diwakar Jha
Hello,

I'm trying to use a custom trigger for one of my use case. I have a basic
logic (as shown below) of using keyBy on the input stream and using a
window of 1 min.

.keyBy()
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.trigger(new CustomTrigger())
.aggregate(Input.getAggregationFunction(), new
AggregationProcessingWindow());


My custom trigger is expected to fire the first event of the keyBy
instantly and any subsequent events should be aggregated in the window.

.trigger(new Trigger() {
> @Override
> public TriggerResult onElement(Record record, long l, TimeWindow
> timeWindow, TriggerContext triggerContext) throws Exception {
> ValueState firstSeen =
> triggerContext.getPartitionedState(firstSceenDescriptor);
> if(firstSeen.value() == null) {
> firstSeen.update(true);
> // fire trigger to early evaluate window and purge that event.
> return TriggerResult.FIRE_AND_PURGE;
> }
> // Continue. Do not evaluate window per element
> return TriggerResult.CONTINUE;
> }
> @Override
> public TriggerResult onProcessingTime(long l, TimeWindow timeWindow,
> TriggerContext triggerContext) throws Exception {
> // final evaluation and purge window state
> return TriggerResult.FIRE_AND_PURGE;
> }
> @Override
> public TriggerResult onEventTime(long l, TimeWindow timeWindow,
> TriggerContext triggerContext) throws Exception {
> return TriggerResult.CONTINUE;
> }
> @Override
> public void clear(TimeWindow timeWindow, TriggerContext triggerContext)
> throws Exception {
>
> }
> })




Currently, I see (for each window and same key) the first event of the
window is always fired. But I want to see this happening for only the first
window and for the subsequent window it should aggregate all the events and
then fire.

Example : all the records have the same key.
current output.
record 1 : first event in the window-1 : fired record 2 : last event in the
window-1 : fired record 3 : first event in the window-2 : fired record 4,
record 5 : - 2 events in the window-2 : fired.

expected output.
record 1 : first event in the window-1 : fired record 2 : last event in the
window-1 : fired record 3,4,5 : all event in the window-2 : fired
window-2 should not fire the first event of the same key.

I'm reading it here
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#fire-and-purge
but not able to solve it. Any pointers would be helpful.

Thanks.


Re: Re: Flink SQL 写入Hive问题请教

2021-02-22 Thread Rui Li
是的,hive表必须存在HiveCatalog里才能正常读写

On Tue, Feb 23, 2021 at 10:14 AM yinghua...@163.com 
wrote:

>
> Flink的版本是1.11.3,目前我们所有表的catalog类型都是GenericInMemoryCatalog,是不是Hive表要用HiveCatalog才行?
>
>
>
> yinghua...@163.com
>
> 发件人: Rui Li
> 发送时间: 2021-02-23 10:05
> 收件人: user-zh
> 主题: Re: Re: Flink SQL 写入Hive问题请教
> 你好,
>
> 用的flink是什么版本呢?另外这张hive表是创建在HiveCatalog里的么?
>
> On Tue, Feb 23, 2021 at 9:01 AM 邮件帮助中心  wrote:
>
> > 我增加调试日志后,发现执行DDL语句创建hive表时,设置了dialect 为hive,现在报错根据堆栈信息是在执行DML语句insert
> > into时创建Hive表时提示没有连接器的配置
> > Table options are: 'is_generic'='false'
> > 'partition.time-extractor.timestamp-pattern'='$dt $hr'
> > 'sink.partition-commit.delay'='0S'
> > 'sink.partition-commit.policy.kind'='metastore,success-file'
> > 'sink.partition-commit.trigger'='partition-time' at
> >
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
> > at
> >
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
> > at
> >
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> > at
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> > at
> >
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> > at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > at
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
> > scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
> > scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> > scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
> > scala.collection.AbstractTraversable.map(Traversable.scala:104) at
> >
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> > at
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1270)
> > at
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:701)
> > at
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:789)
> > at
> >
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:691)
> > at
> >
> com.cgws.ccp.flink.sql.submit.SqlSubmit.callInsertInto(SqlSubmit.java:242)
> > at
> com.cgws.ccp.flink.sql.submit.SqlSubmit.callCommand(SqlSubmit.java:201)
> > at com.cgws.ccp.flink.sql.submit.SqlSubmit.run(SqlSubmit.java:126) at
> > com.cgws.ccp.flink.sql.submit.SqlSubmit.main(SqlSubmit.java:84) at
> > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
> >
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> > at
> >
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> > at java.lang.reflect.Method.invoke(Method.java:498) at
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> > ... 11 more Caused by: org.apache.flink.table.api.ValidationException:
> > Table options do not contain an option key 'connector' for discovering a
> > connector. at
> >
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
> > at
> >
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
> > ... 37 more
> >
> >
> > 假如在执行DML语句时设置Hive方言,那么Kafka的表不是Hive语法,这个该怎么处理?
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2021-02-22 17:12:55,"eriendeng"  写道:
> > >你这没有把dialect set成hive吧,走到了else分支。default
> > >dialect是需要指定connector的,参考文档的kafka到hive代码
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/hive_read_write.html#writing
> > >
> > >
> > >
> > >--
> > >Sent from: http://apache-flink.147419.n8.nabble.com/
> >
>
>
> --
> Best regards!
> Rui Li
>


-- 
Best regards!
Rui Li


Re: Re: Flink SQL 写入Hive问题请教

2021-02-22 Thread yinghua...@163.com
Flink的版本是1.11.3,目前我们所有表的catalog类型都是GenericInMemoryCatalog,是不是Hive表要用HiveCatalog才行?



yinghua...@163.com
 
发件人: Rui Li
发送时间: 2021-02-23 10:05
收件人: user-zh
主题: Re: Re: Flink SQL 写入Hive问题请教
你好,
 
用的flink是什么版本呢?另外这张hive表是创建在HiveCatalog里的么?
 
On Tue, Feb 23, 2021 at 9:01 AM 邮件帮助中心  wrote:
 
> 我增加调试日志后,发现执行DDL语句创建hive表时,设置了dialect 为hive,现在报错根据堆栈信息是在执行DML语句insert
> into时创建Hive表时提示没有连接器的配置
> Table options are: 'is_generic'='false'
> 'partition.time-extractor.timestamp-pattern'='$dt $hr'
> 'sink.partition-commit.delay'='0S'
> 'sink.partition-commit.policy.kind'='metastore,success-file'
> 'sink.partition-commit.trigger'='partition-time' at
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1270)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:701)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:789)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:691)
> at
> com.cgws.ccp.flink.sql.submit.SqlSubmit.callInsertInto(SqlSubmit.java:242)
> at com.cgws.ccp.flink.sql.submit.SqlSubmit.callCommand(SqlSubmit.java:201)
> at com.cgws.ccp.flink.sql.submit.SqlSubmit.run(SqlSubmit.java:126) at
> com.cgws.ccp.flink.sql.submit.SqlSubmit.main(SqlSubmit.java:84) at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498) at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> ... 11 more Caused by: org.apache.flink.table.api.ValidationException:
> Table options do not contain an option key 'connector' for discovering a
> connector. at
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
> ... 37 more
>
>
> 假如在执行DML语句时设置Hive方言,那么Kafka的表不是Hive语法,这个该怎么处理?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-02-22 17:12:55,"eriendeng"  写道:
> >你这没有把dialect set成hive吧,走到了else分支。default
> >dialect是需要指定connector的,参考文档的kafka到hive代码
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/hive_read_write.html#writing
> >
> >
> >
> >--
> >Sent from: http://apache-flink.147419.n8.nabble.com/
>
 
 
-- 
Best regards!
Rui Li


Re: Re: Flink SQL 写入Hive问题请教

2021-02-22 Thread Rui Li
你好,

用的flink是什么版本呢?另外这张hive表是创建在HiveCatalog里的么?

On Tue, Feb 23, 2021 at 9:01 AM 邮件帮助中心  wrote:

> 我增加调试日志后,发现执行DDL语句创建hive表时,设置了dialect 为hive,现在报错根据堆栈信息是在执行DML语句insert
> into时创建Hive表时提示没有连接器的配置
> Table options are: 'is_generic'='false'
> 'partition.time-extractor.timestamp-pattern'='$dt $hr'
> 'sink.partition-commit.delay'='0S'
> 'sink.partition-commit.policy.kind'='metastore,success-file'
> 'sink.partition-commit.trigger'='partition-time' at
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> at
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1270)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:701)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:789)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:691)
> at
> com.cgws.ccp.flink.sql.submit.SqlSubmit.callInsertInto(SqlSubmit.java:242)
> at com.cgws.ccp.flink.sql.submit.SqlSubmit.callCommand(SqlSubmit.java:201)
> at com.cgws.ccp.flink.sql.submit.SqlSubmit.run(SqlSubmit.java:126) at
> com.cgws.ccp.flink.sql.submit.SqlSubmit.main(SqlSubmit.java:84) at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498) at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> ... 11 more Caused by: org.apache.flink.table.api.ValidationException:
> Table options do not contain an option key 'connector' for discovering a
> connector. at
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
> ... 37 more
>
>
> 假如在执行DML语句时设置Hive方言,那么Kafka的表不是Hive语法,这个该怎么处理?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-02-22 17:12:55,"eriendeng"  写道:
> >你这没有把dialect set成hive吧,走到了else分支。default
> >dialect是需要指定connector的,参考文档的kafka到hive代码
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/hive_read_write.html#writing
> >
> >
> >
> >--
> >Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best regards!
Rui Li


Re: Re:Re: Flink SQL 写入Hive问题请教

2021-02-22 Thread eriendeng
在hive catalog下创建kafka source表会在hive
metastore中创建一张仅包含元数据的表,hive不可查,flink任务中可以识别并当成hive表读,然后只需要在hive
dialect下正常读出写入即可。
参考 https://my.oschina.net/u/2828172/blog/4415970



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: Flink SQL 写入Hive问题请教

2021-02-22 Thread 邮件帮助中心
我增加调试日志后,发现执行DDL语句创建hive表时,设置了dialect 为hive,现在报错根据堆栈信息是在执行DML语句insert 
into时创建Hive表时提示没有连接器的配置
Table options are: 'is_generic'='false' 
'partition.time-extractor.timestamp-pattern'='$dt $hr' 
'sink.partition-commit.delay'='0S' 
'sink.partition-commit.policy.kind'='metastore,success-file' 
'sink.partition-commit.trigger'='partition-time' at 
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
 at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
 at 
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.Iterator$class.foreach(Iterator.scala:891) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at 
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1270)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:701)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:789)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:691)
 at com.cgws.ccp.flink.sql.submit.SqlSubmit.callInsertInto(SqlSubmit.java:242) 
at com.cgws.ccp.flink.sql.submit.SqlSubmit.callCommand(SqlSubmit.java:201) at 
com.cgws.ccp.flink.sql.submit.SqlSubmit.run(SqlSubmit.java:126) at 
com.cgws.ccp.flink.sql.submit.SqlSubmit.main(SqlSubmit.java:84) at 
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
 ... 11 more Caused by: org.apache.flink.table.api.ValidationException: Table 
options do not contain an option key 'connector' for discovering a connector. 
at 
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:321)
 at 
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
 ... 37 more


假如在执行DML语句时设置Hive方言,那么Kafka的表不是Hive语法,这个该怎么处理?

















在 2021-02-22 17:12:55,"eriendeng"  写道:
>你这没有把dialect set成hive吧,走到了else分支。default
>dialect是需要指定connector的,参考文档的kafka到hive代码
>https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/hive_read_write.html#writing
>
>
>
>--
>Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Community chat?

2021-02-22 Thread Yuval Itzchakov
A dedicated Slack would be awesome.

On Mon, Feb 22, 2021, 22:57 Sebastián Magrí  wrote:

> Is there any chat from the community?
>
> I saw the freenode channel but it's pretty dead.
>
> A lot of the time a more chat alike venue where to discuss stuff
> synchronously or just share ideas turns out very useful and estimulates the
> community.
>
> --
> Sebastián Ramírez Magrí
>


Community chat?

2021-02-22 Thread Sebastián Magrí
Is there any chat from the community?

I saw the freenode channel but it's pretty dead.

A lot of the time a more chat alike venue where to discuss stuff
synchronously or just share ideas turns out very useful and estimulates the
community.

-- 
Sebastián Ramírez Magrí


Julia API/Interface for Flink

2021-02-22 Thread Beni Bilme

Hello,

Is there a julia api or interface for using flink?

Thanks in advance for any response.

Beni




回复: Re: 大佬们, keyby()两次, 然后再window(), 会有几个窗口?

2021-02-22 Thread hdxg1101300123

为什么flatmap就是2个


发自vivo智能手机
> 不对,看了你描述没看代码。你代码那么写的化是2个哈。因为你keyBy后做了flatMap,再keyBy就是另外一个了哈。 
>
> yidan zhao  于2021年2月22日周一 上午10:31写道: 
>
> > 只有最后一个keyBy有效。 
> > 
> > Hongyuan Ma  于2021年2月21日周日 下午10:59写道: 
> > 
> >> 大佬们, 如果keyby两次然后再调用window()的话是只根据最后一次keyby的键生成n个窗口, 
> >> 还是在前一次keyby的基础上生成m*n个窗口? 
> >> 
> >> 
> >> 像下面这样写, 最后的窗口是只按area划分的吗? 
> >> // 我想对不同车(id)的轨迹进行预测, 然后统计各个区域(area)内的轨迹信息 
> >> stream.keyby("id") 
> >> .flatmap() // 根据id 对轨迹进行预测, 在里面使用key state 
> >> .assignTime() // 修改轨迹eventTime为预测出的时间 
> >> .keyby("area") 
> >> .window() // 根据区域划分窗口 
> >> .process() // 统计各个区域内的轨迹 
> >> 
> >> 


WatermarkStrategy.for_bounded_out_of_orderness in table API

2021-02-22 Thread joris.vanagtmaal
Can i set the watermark strategy to bounded out of orderness when using the
table api and sql DDL to assign watermarks?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Install/Run Streaming Anomaly Detection R package in Flink

2021-02-22 Thread Robert Cullen
My customer wants us to install this package in our Flink Cluster:

https://github.com/twitter/AnomalyDetection

One of our engineers developed a python version:

https://pypi.org/project/streaming-anomaly-detection/

Is there a way to install this in our cluster?

-- 
Robert Cullen
240-475-4490


Re: Question

2021-02-22 Thread Matthias Pohl
Hi,
running your job from within your IDE with no specific configuration
provided (like the Flink job examples provided by the Flink [1]) means that
you spin up a local Flink cluster (see MiniCluster [2]). This does not have
the web UI enabled by default. You could enable it by calling
`StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);`,
instead. Don't forget to add the `flink-runtime-web` dependency:

  org.apache.flink
  flink-runtime-web_${scala.binary.version}
  ${flink.version}


Best,
Matthias

[1] https://github.com/apache/flink/tree/master/flink-examples
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/runtime/minicluster/MiniCluster.html

On Sat, Feb 20, 2021 at 4:07 AM Abu Bakar Siddiqur Rahman Rocky <
bakar121...@gmail.com> wrote:

> Hi,
>
> I read it:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/try-flink/local_installation.html
>
> I can run the code in the UI of Apache Flink that is in the bin file of
> Apache Flink. If I run a java code from intellij idea or eclipse, then how
> can I connect the code to apache flink UI?
>
> Thank you!
>
> On Fri, Feb 12, 2021 at 11:43 AM Matthias Pohl 
> wrote:
>
>> Checkpoints are stored in some DFS storage. The location can be specified
>> using state.checkpoints.dir configuration property [1]. You can access the
>> state of a savepoint or checkpoint using the State Processor API [2].
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#directory-structure
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html
>>
>> On Fri, Feb 12, 2021 at 5:35 PM Abu Bakar Siddiqur Rahman Rocky <
>> bakar121...@gmail.com> wrote:
>>
>>> Thank you for your reply.
>>>
>>> Another Question:
>>> After Checkpointing, we save our snapshot to a storage. How can we
>>> access the storage?
>>>
>>> is this the source code:
>>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
>>>
>>> If it is not, could you please provide me the source code to access in
>>> the storage where snapshots are saved?
>>>
>>> Thank you
>>>
>>>
>>>
>>>
>>> On Fri, Feb 12, 2021 at 2:44 AM Matthias Pohl 
>>> wrote:
>>>
 Hi Abu Bakar Siddiqur Rahman,
 Have you had a look at the Flink documentation [1]? It provides
 step-by-step instructions on how to run a job (the Flink binaries provide
 example jobs under ./examples) using a local standalone cluster. This
 should also work on a Mac. You would just need to start the Flink cluster
 (./bin/start-cluster.sh) and submit a job using one of the example jars
 provided in the binaries (e.g. ./bin/flink run -d
 ./examples/streaming/WordCount.jar). You can check the job running in
 Flink's web UI being available under http://localhost:8081 if you use
 the default configuration provided by the Flink binaries.

 Best,
 Matthias

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.12/try-flink/local_installation.html

 On Thu, Feb 11, 2021 at 3:45 PM Abu Bakar Siddiqur Rahman Rocky <
 bakar121...@gmail.com> wrote:

> Hi,
>
> Is there anyone who can inform me how I can connect a Java program to
> Apache Flink (in mac)?
>
> Thank you!
>
> Regards,
> Abu Bakar Siddiqur Rahman
>
> On Thu, Feb 11, 2021 at 4:26 AM Abu Bakar Siddiqur Rahman Rocky <
> bakar121...@gmail.com> wrote:
>
>> Hi Chesnay,
>>
>> Could you please inform me that how can I connect a Java program to
>> apache Flink (in mac)?
>>
>> Thank you!
>>
>> Regards,
>> Abu Bakar Siddiqur Rahman
>>
>> On Wed, Feb 3, 2021, 3:06 AM Chesnay Schepler 
>> wrote:
>>
>>> Sure.
>>>
>>>
>>> https://github.com/apache/flink/tree/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint
>>>
>>>
>>> https://github.com/apache/flink/tree/master/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper
>>>
>>> https://github.com/apache/flink/tree/master/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper
>>>
>>> On 2/3/2021 3:08 AM, Abu Bakar Siddiqur Rahman Rocky wrote:
>>>
>>> Hi,
>>>
>>> Is there any source code for the checkpoints, snapshot and zookeeper
>>> mechanism?
>>>
>>> Thank you
>>>
>>> On Mon, Feb 1, 2021 at 4:23 AM Chesnay Schepler 
>>> wrote:
>>>
 Could you expand a bit on what you mean? Are you referring to
 *savepoints*?

 On 1/28/2021 3:24 PM, Abu Bakar Siddiqur Rahman Rocky wrote:

 Hi,

 Is there any library to use and remember the apache flink snapshot?

 Thank you

 --
 Regards,
 Abu Bakar Siddiqur Rahman

trying to understand watermark effect on rolling average

2021-02-22 Thread joris.vanagtmaal
I'm trying to calculate a simple rolling average using pyflink, but somehow
the last rows streaming in seem to be excluded, which i expected to be the
result of data arriving out of order. However i fail to understand why. 

exec_env = StreamExecutionEnvironment.get_execution_environment()
exec_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env_settings =
EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env =
StreamTableEnvironment.create(exec_env,environment_settings=env_settings)
table_env.get_config().get_configuration().set_string("pipeline.jars",
"file:///Users/jag002/flinkjars/flink-connector-kafka_2.11-1.13-SNAPSHOT.jar;file:///Users/jag002/flinkjars/flink-sql-connector-kafka_2.11-1.13-SNAPSHOT.jar;file:///Users/jag002/flinkjars/kafka-clients-2.7.0.jar")

source_ts_ddl = f"""
CREATE TABLE kafka_ts_source (
sender VARCHAR,
length DOUBLE,
rot DOUBLE,
sog DOUBLE,
stw DOUBLE,
x_time STRING,
*rt as TO_TIMESTAMP(x_time),
WATERMARK FOR rt as rt - INTERVAL '1' second*
) with (
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  'connector.topic' = 'dms_ts',
  'connector.properties.bootstrap.servers' = '{kafka_servers}',
  'connector.properties.zookeeper.connect' = 
'{kafka_zookeeper_servers}',
  'connector.properties.group.id' = '{kafka_consumer_group_id}',
  'connector.startup-mode' = 'latest-offset',
  'format.type' = 'json'
)
"""
table_env.sql_update(source_ts_ddl)

calculate_table = table_env.sql_query("""SELECT 
sender,
stw,  
rt
FROM kafka_ts_source
""")

temp1_ddl = f"""
CREATE TABLE kafka_temp1_sink (
sender VARCHAR,
stw DOUBLE,
rt TIMESTAMP(3)
) with (
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  'connector.topic' = 'temp_sink1',
  'connector.properties.bootstrap.servers' = 
'{kafka_servers}',
  'connector.properties.zookeeper.connect' = 
'{kafka_zookeeper_servers}',
  'connector.properties.group.id' = 
'{kafka_consumer_group_id}',
  'connector.startup-mode' = 'latest-offset',
  'format.type' = 'json'
)
"""
table_env.sql_update(temp1_ddl)
calculate_table.execute_insert("kafka_temp1_sink")

*averages_table = calculate_table.select("sender,
stw").over_window(Over.partition_by("sender").order_by("rt") \
.preceding(expr.lit(50).seconds) \
.following("CURRENT_RANGE").alias("w")) \
.select(calculate_table.sender.alias('sender'),
calculate_table.rt.alias('rt'), calculate_table.stw.alias('stw'),
calculate_table.stw.avg.over(expr.col('w')).alias('avgstw'), 

calculate_table.rt.min.over(expr.col('w')).cast(DataTypes.TIMESTAMP(3)).alias('minrt'),
 

calculate_table.rt.max.over(expr.col('w')).cast(DataTypes.TIMESTAMP(3)).alias('maxrt'))*

temp2_ddl = f"""
CREATE TABLE kafka_temp2_sink (
sender VARCHAR,
rt TIMESTAMP(3),
stw DOUBLE,
avgstw DOUBLE,
minrt TIMESTAMP(3),
maxrt TIMESTAMP(3)
) with (
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  'connector.topic' = 'temp_sink2',
  'connector.properties.bootstrap.servers' = 
'{kafka_servers}',
  'connector.properties.zookeeper.connect' = 
'{kafka_zookeeper_servers}',
  'connector.properties.group.id' = 
'{kafka_consumer_group_id}',
  'connector.startup-mode' = 'latest-offset',
  'format.type' = 'json'
)
"""
table_env.sql_update(temp2_ddl)
averages_table.execute_insert("kafka_temp2_sink").wait()
table_env.execute("ark_demo")

my input is 44 events:
index senderstw rt
0   da2fe388-390d-41d7-982b-2a15dc990f063.00
2021-02-18T08:55:19Z
1   3a1aca88-a31b-4bcc-82a0-629f581365700.20
2021-02-18T08:55:27Z
2   4c7d0d79-cf48-4df7-a590-dcfacca03ee41.00
2021-02-18T09:05:13Z
3   

Re: [Statefun] Dynamic behavior

2021-02-22 Thread Igal Shilman
Hi Miguel,

I think that there are a couple of ways to achieve this, and it really
depends on your specific use case, and the trade-offs
that you are willing to accept.

For example, one way to approach this:
- Suppose you have an external service somewhere that returns a
representation of the logic to be interpreted by
your function at runtime (I think that is the scenario you are describing)
- Then, you can write a background task (a thread) that periodically
queries that service, and keeps in memory the latest version.
- You can initialize this background task in your FunctionProvider
implementation, or even in your StatefulModule if you wish.
- Then, make sure that your dynamic stateful function has an access to the
latest value fetched by your client (for example via a shared reference
like a j.u.c.AtomicReference)
- Then on receive, you can simply get that reference and re-apply your
rules.

Take a look at [1] for example (it is not exactly the same, but I believe
that it is close enough)

[1]
https://github.com/apache/flink-statefun/blob/master/statefun-examples/statefun-async-example/src/main/java/org/apache/flink/statefun/examples/async/Module.java

Good luck,
Igal.


On Mon, Feb 22, 2021 at 4:32 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi,
>
> FWIW, there is this JIRA that is tracking a pubsub / broadcast messaging
> primitive in StateFun:
> https://issues.apache.org/jira/browse/FLINK-16319
>
> This is probably what you are looking for. And I do agree, in the case
> that the control stream (which updates the application logic) is high
> volume, redeploying functions may not work well.
>
> I don't think there really is a "recommended" way of doing the "broadcast
> control stream, join with main stream" pattern with StateFun at the moment,
> at least without FLINK-16319.
> On the other hand, it could be possible to use stateful functions to
> implement a pub-sub model in user space for the time being. I've actually
> left some ideas for implementing that in the comments of FLINK-16319.
>
> Cheers,
> Gordon
>
>
> On Mon, Feb 22, 2021 at 6:38 AM Miguel Araújo  wrote:
>
>> Hi everyone,
>>
>> What is the recommended way of achieving the equivalent of a broadcast in
>> Flink when using Stateful Functions?
>>
>> For instance, assume we are implementing something similar to Flink's
>> demo fraud detection
>>  but
>> in Stateful Functions - how can one dynamically update the application's
>> logic then?
>> There was a similar question in this mailing list in the past where it
>> was recommended moving the dynamic logic to a remote function
>> 
>>  so
>> that one could achieve that by deploying a new container. I think that's
>> not very realistic as updates might happen with a frequency that's not
>> compatible with that approach (e.g., sticking to the fraud detection
>> example, updating fraud detection rules every hour is not unusual), nor
>> should one be deploying a new container when data (not code) changes.
>>
>> Is there a way of, for example, modifying FunctionProviders
>> 
>> on the fly?
>>
>> Thanks,
>> Miguel
>>
>


RE: Sharding of Operators

2021-02-22 Thread Tripathi,Vikash
Thanks Chesnay, that answers my question.

In my case NextOp is operating on keyed streams and now it makes sense to me 
that along with key re-distribution, the state will also be re-distributed so 
effectively the ‘NextOp4’ instance can process all the tuples together for key 
‘A’, those that were seen earlier and even those that would be coming now.

From: Chesnay Schepler 
Sent: Monday, February 22, 2021 4:50 PM
To: Tripathi,Vikash ; yidan zhao 

Cc: user 
Subject: Re: Sharding of Operators

Let's clarify what NextOp is.
Is it operating on a keyed stream, or is something like a map function that has 
some internal state based on keys of the input elements (e.g., it has something 
like a Map that it queries/modifies for each input element)?

If NextOp operators on a keyed stream then it's (keyed) state will be 
redistributed.
If it is not a keyed state then, in your example, it will never receive another 
element with key A.

On 2/22/2021 12:07 PM, Tripathi,Vikash wrote:
Just needed more clarity in terms of a processing scenario.

Say, I was having records of key ‘A’ on a parallel instance ‘Op1’ of operator 
‘Op’ and the next operator ‘NextOp’ in the sequence of transformation was 
getting records of key ‘A’ on it’s parallel instance ‘NextOp2’ at the time when 
the savepoint was made.

Now, the application has been rescaled to a parallelism level of say 4 as 
against 2 which was the case at the time of savepoint.

Now, let’s say key ‘A’ records land up to ‘NextOp4’, parallel instance of 
‘NextOp’ operator after re-scaling but the operations being performed in this 
‘NextOp’ operator demands a windowing event based on event time processing that 
has still not been finished even after restarting the application from previous 
savepoint. Some records of the same key ‘A’ lie together to be processed in 
parallel instance ‘NextOp2’ as was the case during savepoint and the new set of 
records for the same key now happen to be redirected together, for being 
processed on the parallel instance ‘NextOp4’ of the same operator. However, to 
generate a consistent result, the event time window needs to do calculations 
that take into account both the record sets for key ‘A’ which are present on 
different instances of the same operator ‘NextOp’.

How will flink runtime handle such a situation?

From: Chesnay Schepler 
Sent: Friday, February 19, 2021 12:52 AM
To: yidan zhao ; 
Tripathi,Vikash 
Cc: user 
Subject: Re: Sharding of Operators

When you change the parallelism then keys are re-distributed across operators 
instances.

However, this re-distribution is limited to the set maxParallelism (set via the 
ExecutionConfig), which by default is 128 if no operators exceeded the 
parallelism on the first submission.
This cannot be changed after the job was run without discarding state.

See 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/production_ready.html#set-an-explicit-max-parallelism

On 2/18/2021 8:29 AM, yidan zhao wrote:
Actually, we only need to ensure all records belonging to the same key will be 
forwarded to the same operator instance(i), and we do not need to guarantee 
that 'i' is the same with the 'i' in previous savepoints. When the job is 
restarted, the rule 'same key's record will be in together' is guaranteed and 
more slots will be surely useful, since each slot(operator instance) will be 
responsible for less keys, leading to less records.

Tripathi,Vikash mailto:vikash.tripa...@cerner.com>> 
于2021年2月18日周四 上午12:09写道:
Hi there,

I wanted to know how re-partitioning of keys per operator instance would happen 
when the current operator instances are scaled up or down and we are restarting 
our job from a previous savepoint which had a different number of parallel 
instances of the same operator.

My main concern is whether the re-distribution would lead to mapping of same 
keys to same operator instances as was done earlier but if this happens then 
there would be no added advantage of adding new task slots for the same 
operator because they would remain less used or not used at all if all possible 
key values have been seen earlier and if we go by the other way around of 
evenly distributing out keys (based on the hash function) to the new parallel 
slots as well, won't this cause issues in terms of processing consistent 
results based on the state of operator as was provided by previous 

Re: State Access Beyond RichCoFlatMapFunction

2021-02-22 Thread Kezhu Wang
Flink IT tests covers queryable state with mini cluster.

All tests:
https://github.com/apache/flink/tree/5c772f0a9cb5f8ac8e3f850a0278a75c5fa059d5/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases

Setup/Configs:
https://github.com/apache/flink/blob/5c772f0a9cb5f8ac8e3f850a0278a75c5fa059d5/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateFsBackendITCase.java#L67

Test case:
https://github.com/apache/flink/blob/5c772f0a9cb5f8ac8e3f850a0278a75c5fa059d5/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java#L912


Best,
Kezhu Wang

On February 19, 2021 at 20:27:41, Sandeep khanzode (sand...@shiftright.ai)
wrote:

Hello,

Is there an example setup of Queryable State for a Local Embedded
Environment?

I am trying to execute Flink programs from within IntelliJ. Any help would
be appreciated!

Even if not, if there are other examples where QueryableState can be
executed in a standalone cluster, that would also be good help. Thanks.


On 10-Feb-2021, at 9:05 AM, Kezhu Wang  wrote:

(a) It is by design. For keyed state, you can only access state for that
key, not others. If you want one value per key, ValueState fits more
appropriate that MapState.
(b) state-processor-api aims to access/create/modify/upgrade offline
savepoint but not running state. Queryable state may meet your requirement,
but it is not actively developed for a while according to my observation
and still beta.

Queryable state:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/queryable_state.html


On February 9, 2021 at 22:09:29, Sandeep khanzode (sand...@shiftright.ai)
wrote:

Hello,

I am creating a class that extends RichCoFlatMapFunction. I need to
connect() two streams to basically share the state of one stream in
another.

This is what I do:

private transient MapState state;

@Override

public void open(Configuration parameters) throws Exception {
MapStateDescriptor stateDescriptor =
new MapStateDescriptor<>(“abc-saved-state",
Types.POJO(KeyClass.class), Types.POJO(ValueClass.class));
state = getRuntimeContext().getMapState(stateDescriptor);


This works correctly.


I have two questions:
(a) Whenever I debug, I can only see the current key in the MapState, not
all the possible keys that were created before and saved. Next time, I get
a hit for another key, I will only see the other key and not the rest of
previous keys. Is it by design or am I missing something?

(b) Can I somehow access this state beyond the class that holds the state?
I.e. can I access the state in some other class? If not, can I use the
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/libs/state_processor_api.html
to
do this? Is that the correct way to access the running state of one stream
elsewhere in the program without corrupting it?


Your response will be greatly appreciated. I will be happy to add more
details if required.

Thanks,
Sandeep Ramesh Khanzode


Re: Sharding of Operators

2021-02-22 Thread Chesnay Schepler

Let's clarify what NextOp is.
Is it operating on a keyed stream, or is something like a map function 
that has some internal state based on keys of the input elements (e.g., 
it has something like a Map that it queries/modifies for 
each input element)?


If NextOp operators on a keyed stream then it's (keyed) state will be 
redistributed.
If it is not a keyed state then, in your example, it will never receive 
another element with key A.


On 2/22/2021 12:07 PM, Tripathi,Vikash wrote:


Just needed more clarity in terms of a processing scenario.

Say, I was having records of key ‘A’ on a parallel instance ‘Op1’ of 
operator ‘Op’ and the next operator ‘NextOp’ in the sequence of 
transformation was getting records of key ‘A’ on it’s parallel 
instance ‘NextOp2’ at the time when the savepoint was made.


Now, the application has been rescaled to a parallelism level of say 4 
as against 2 which was the case at the time of savepoint.


Now, let’s say key ‘A’ records land up to ‘NextOp4’, parallel instance 
of ‘NextOp’ operator after re-scaling but the operations being 
performed in this ‘NextOp’ operator demands a windowing event based on 
event time processing that has still not been finished even after 
restarting the application from previous savepoint. Some records of 
the same key ‘A’ lie together to be processed in parallel instance 
‘NextOp2’ as was the case during savepoint and the new set of records 
for the same key now happen to be redirected together, for being 
processed on the parallel instance ‘NextOp4’ of the same operator. 
However, to generate a consistent result, the event time window needs 
to do calculations that take into account both the record sets for key 
‘A’ which are present on different instances of the same operator 
‘NextOp’.


How will flink runtime handle such a situation?

*From:*Chesnay Schepler 
*Sent:* Friday, February 19, 2021 12:52 AM
*To:* yidan zhao ; Tripathi,Vikash 


*Cc:* user 
*Subject:* Re: Sharding of Operators

When you change the parallelism then keys are re-distributed across 
operators instances.


/However/, this re-distribution is limited to the set /maxParallelism 
/(set via the ExecutionConfig), which by default is 128 if no 
operators exceeded the parallelism on the first submission.


This *cannot be changed* after the job was run without discarding state.

See 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/production_ready.html#set-an-explicit-max-parallelism 



On 2/18/2021 8:29 AM, yidan zhao wrote:

Actually, we only need to ensure all records belonging to the same
key will be forwarded to the same operator instance(i), and we do
not need to guarantee that 'i' is the same with the 'i' in
previous savepoints. When the job is restarted, the rule 'same
key's record will be in together' is guaranteed and more slots
will be surely useful, since each slot(operator instance) will be
responsible for less keys, leading to less records.

Tripathi,Vikash mailto:vikash.tripa...@cerner.com>> 于2021年2月18日周四
上午12:09写道:

Hi there,

I wanted to know how re-partitioning of keys per operator
instance would happen when the current operator instances are
scaled up or down and we are restarting our job from a
previous savepoint which had a different number of parallel
instances of the same operator.

My main concern is whether the re-distribution would lead to
mapping of same keys to same operator instances as was done
earlier but if this happens then there would be no added
advantage of adding new task slots for the same operator
because they would remain less used or not used at all if all
possible key values have been seen earlier and if we go by the
other way around of evenly distributing out keys (based on the
hash function) to the new parallel slots as well, won't this
cause issues in terms of processing consistent results based
on the state of operator as was provided by previous savepoint
of application.

Is there a guarantee given by the hash function as in attached
snippet, that same keys which landed earlier on an operator
instance will land back again to the same operator instance
once the job is restarted with new set of parallelism
configuration?

Thanks,

Vikash

CONFIDENTIALITY NOTICE This message and any included
attachments 

Re: Flink standalone模式如何区分各个任务的日志?

2021-02-22 Thread Yang Wang
Flink的standalone application模式[1]是可以每个app都单独记录日志的

[1].
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/kubernetes/#deploy-application-cluster

Best,
Yang

xingoo <23603...@qq.com> 于2021年2月22日周一 下午12:01写道:

> Hi,
>
> 这样体验上还是不太友好,如果能做成spark那种每个Job独立记录日志就好了
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


RE: Sharding of Operators

2021-02-22 Thread Tripathi,Vikash
Just needed more clarity in terms of a processing scenario.

Say, I was having records of key ‘A’ on a parallel instance ‘Op1’ of operator 
‘Op’ and the next operator ‘NextOp’ in the sequence of transformation was 
getting records of key ‘A’ on it’s parallel instance ‘NextOp2’ at the time when 
the savepoint was made.

Now, the application has been rescaled to a parallelism level of say 4 as 
against 2 which was the case at the time of savepoint.

Now, let’s say key ‘A’ records land up to ‘NextOp4’, parallel instance of 
‘NextOp’ operator after re-scaling but the operations being performed in this 
‘NextOp’ operator demands a windowing event based on event time processing that 
has still not been finished even after restarting the application from previous 
savepoint. Some records of the same key ‘A’ lie together to be processed in 
parallel instance ‘NextOp2’ as was the case during savepoint and the new set of 
records for the same key now happen to be redirected together, for being 
processed on the parallel instance ‘NextOp4’ of the same operator. However, to 
generate a consistent result, the event time window needs to do calculations 
that take into account both the record sets for key ‘A’ which are present on 
different instances of the same operator ‘NextOp’.

How will flink runtime handle such a situation?

From: Chesnay Schepler 
Sent: Friday, February 19, 2021 12:52 AM
To: yidan zhao ; Tripathi,Vikash 

Cc: user 
Subject: Re: Sharding of Operators

When you change the parallelism then keys are re-distributed across operators 
instances.

However, this re-distribution is limited to the set maxParallelism (set via the 
ExecutionConfig), which by default is 128 if no operators exceeded the 
parallelism on the first submission.
This cannot be changed after the job was run without discarding state.

See 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/production_ready.html#set-an-explicit-max-parallelism

On 2/18/2021 8:29 AM, yidan zhao wrote:
Actually, we only need to ensure all records belonging to the same key will be 
forwarded to the same operator instance(i), and we do not need to guarantee 
that 'i' is the same with the 'i' in previous savepoints. When the job is 
restarted, the rule 'same key's record will be in together' is guaranteed and 
more slots will be surely useful, since each slot(operator instance) will be 
responsible for less keys, leading to less records.

Tripathi,Vikash mailto:vikash.tripa...@cerner.com>> 
于2021年2月18日周四 上午12:09写道:
Hi there,

I wanted to know how re-partitioning of keys per operator instance would happen 
when the current operator instances are scaled up or down and we are restarting 
our job from a previous savepoint which had a different number of parallel 
instances of the same operator.

My main concern is whether the re-distribution would lead to mapping of same 
keys to same operator instances as was done earlier but if this happens then 
there would be no added advantage of adding new task slots for the same 
operator because they would remain less used or not used at all if all possible 
key values have been seen earlier and if we go by the other way around of 
evenly distributing out keys (based on the hash function) to the new parallel 
slots as well, won't this cause issues in terms of processing consistent 
results based on the state of operator as was provided by previous savepoint of 
application.

Is there a guarantee given by the hash function as in attached snippet, that 
same keys which landed earlier on an operator instance will land back again to 
the same operator instance once the job is restarted with new set of 
parallelism configuration?

Thanks,
Vikash



CONFIDENTIALITY NOTICE This message and any included attachments are from 
Cerner Corporation and are intended only for the addressee. The information 
contained in this message is confidential and may constitute inside or 
non-public information under international, federal, or state securities laws. 
Unauthorized forwarding, printing, copying, distribution, or use of such 
information is strictly prohibited and may be unlawful. If you are not the 
addressee, please promptly delete this message and notify the sender of the 
delivery error by e-mail or you may call Cerner's corporate offices in Kansas 
City, Missouri, U.S.A at (+1) (816)221-1024.




Re: Joining and windowing multiple streams using DataStream API or Table API & SQL

2021-02-22 Thread Pieter Bonte
Hi Till,

Thanks for the feedback.

My use case is a little bit more tricky as I can’t key all the streams by the 
same field. 
Basically I’m trying to solve Continuous SPARQL queries, which consist of many 
joins. I’ve seen that SPARQL queries over RDF data has been discussed before on 
the mailing list, however, not for RDF streams that are only valid with a 
certain time window.
To give you an example of a simple query, which looks for 'Persons that are 
sitting next to Students':
Select * WHERE{
?x a Person.
?x nextTo ?y.
?y a Student.
}
So everything that matches ‘?x a Person’ could be my A Stream, ‘?x nextTo ?y’ a 
B Stream and ‘?y a Student’ a C Stream.
So for the first join, '?x a Person’ and '?x nextTo ?y’ need to be joined on 
variable ?x ,i.e. the first field of the A and B stream, while  '?x nextTo ?y’ 
and '?y a Student’ need to be joined on variable ?y, i.e. the second field of 
the B stream and first field of the C stream.

As I can’t key the streams before windowing, I tried to combine the streams 
together, window them and assign the window end time to each event. Then I 
separated the streams again and joined them using a CoProcessFunction. Based on 
the window end time, I know which events should be joined as they are contained 
in the same window. I thought that I could store the events of both streams and 
the last seen window time end time time stamp. If and event arrives with a 
larger window end time, I could join the previous seen events from both streams 
and clear them.
However, I see that the events arrive out of order in my CoProcessFunction, 
requiring me to store the content of various windows. I was a little bit 
surprised by this behaviour, but perhaps its because I’m using a fixed dataset 
for easy testing? The idea then was to store the content of multiple windows 
and use the progression of the watermark to know which windows could be 
cleared, so I don’t need to store the content of all possible previous windows. 
However,  it does not seem to be possible to combine a MapState with a list. 
The map state would contain the end time of each window as key, and  a list 
with previously seen content of that window as value. 
I’m guessing that there are more elegant and easier ways to solve this?

For the table API, I was able to find a solution as well. I first combine the 
streams, window them and again assign the window end time as time stamp of each 
event. I split the streams and convert them to tables. As the window end times 
are assigned, I can use these to window the data using intervals, e.g. ‘A.ts 
BETWEEN B.ts and B.ts’. This solution works, and it is easier to translate the 
SPARQL query to SQL. However, the program does not garbage collect the content 
of the streams that is out dated, as a window using the DataStream API would. I 
see that my flink program keeps growing in size. Is there a translation of the 
table api windows  to DataStream windows?
Should I use  the ‘setIdleStateRetentionTime’ configuration function, to remove 
state?

Thanks in advance!

Kind regards,
Pieter

-
Dr. Ir. Pieter Bonte
Ghent University - imec 
IDLab 
iGent Tower - Department of Information Technology 
Technologiepark-Zwijnaarde 126, B-9052 Ghent, Belgium 
T: +32 9 33 14938; T Secr: +32 (0)9 33 14900 
F: +32 9 33 14899 
E: pieter.bo...@ugent.be 
W: IDLab.technology 
W: IDLab.ugent.be 

> On 17 Feb 2021, at 10:00, Till Rohrmann  wrote:
> 
> Hi Pieter,
> 
> from the top of my head, I think the easiest way to solve this problem is to 
> implement your own "window join" operation by first unioning all three 
> streams and then applying a ProcessWindowFunction similar to
> 
> allEvents.keyBy((KeySelector) value -> 
> value).window(SlidingEventTimeWindows.of(Time.seconds(10), 
> Time.seconds(5))).process(
>   new ProcessWindowFunction() {
>   @Override
>   public void process(
>   Tuple tuple,
>   Context context,
>   Iterable elements,
>   Collector out) throws Exception {
>   // compute join result from elements
>   }
>   });
> 
> @Timo is there an easier way using Flink's SQL or Table API?
> 
> Cheers,
> Till
> 
> On Tue, Feb 16, 2021 at 3:36 PM Pieter Bonte  > wrote:
> Hi all,
> 
> I’m trying to apply a window operator over multiple streams (more than 2) and 
> join these streams within the validity of the window. However, I have some 
> questions about the time semantics using both the DataStream API and the 
> Table API/SQL.
> 
> Lets say we have 3 streams, an A, B and C stream. And currently we have an 
> A@0 (an A at timestamp 0), a B@5 and two C’s: C@6 and C@13.
> We would like to join these streams when they fall within a sliding window of 
> size 10 and slide 5.
> Thus the first window W1[0,10) should contain A@0, B@5 and C@6.
> The second window W2[5,15) should contain B@5, C@5 and C@13.
> So only 

flink sql 写入clickhouse性能优化

2021-02-22 Thread kandy.wang
flink-jdbc写入clickhouse 在flink 1.12版本 tps只能到35W左右的tps,各位,有什么可以性能调优?

Re: Flink SQL 写入Hive问题请教

2021-02-22 Thread eriendeng
你这没有把dialect set成hive吧,走到了else分支。default
dialect是需要指定connector的,参考文档的kafka到hive代码
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/hive/hive_read_write.html#writing



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Compile time checking of SQL

2021-02-22 Thread Sebastián Magrí
Thanks a lot Timo!

On Mon, 22 Feb 2021 at 08:19, Timo Walther  wrote:

> Yes, this is possible. You can run `tableEnv.sqlQuery("...").explain()`
> or `tableEnv.toRetractStream(table)` which would trigger the complete
> translation of the SQL query without executing it.
>
> Regards,
> Timo
>
> On 20.02.21 18:46, Sebastián Magrí wrote:
> > I mean the SQL queries being validated when I do `mvn compile` or any
> > target that runs that so that basic syntax checking is performed without
> > having to submit the job to the cluster.
> >
> > On Thu, 18 Feb 2021 at 16:17, Timo Walther  > > wrote:
> >
> > Hi Sebastián,
> >
> > what do you consider as compile time? If you mean some kind of SQL
> > editor, you could take a look at Ververica platform (the community
> > edition is free):
> >
> >
> https://www.ververica.com/blog/data-pipelines-with-flink-sql-on-ververica-platform
> > <
> https://www.ververica.com/blog/data-pipelines-with-flink-sql-on-ververica-platform
> >
> >
> > Otherwise Flink SQL is always validated at (what we call) "pre-flight
> > phase". A cluster is not required but it is already JVM runtime of
> the
> > client.
> >
> > Regards,
> > Timo
> >
> > On 18.02.21 14:55, Sebastián Magrí wrote:
> >  > Is there any way to check SQL strings in compile time?
> >  >
> >  > --
> >  > Sebastián Ramírez Magrí
> >
> >
> >
> > --
> > Sebastián Ramírez Magrí
>
>

-- 
Sebastián Ramírez Magrí


Re: Compile time checking of SQL

2021-02-22 Thread Timo Walther
Yes, this is possible. You can run `tableEnv.sqlQuery("...").explain()` 
or `tableEnv.toRetractStream(table)` which would trigger the complete 
translation of the SQL query without executing it.


Regards,
Timo

On 20.02.21 18:46, Sebastián Magrí wrote:
I mean the SQL queries being validated when I do `mvn compile` or any 
target that runs that so that basic syntax checking is performed without 
having to submit the job to the cluster.


On Thu, 18 Feb 2021 at 16:17, Timo Walther > wrote:


Hi Sebastián,

what do you consider as compile time? If you mean some kind of SQL
editor, you could take a look at Ververica platform (the community
edition is free):


https://www.ververica.com/blog/data-pipelines-with-flink-sql-on-ververica-platform



Otherwise Flink SQL is always validated at (what we call) "pre-flight
phase". A cluster is not required but it is already JVM runtime of the
client.

Regards,
Timo

On 18.02.21 14:55, Sebastián Magrí wrote:
 > Is there any way to check SQL strings in compile time?
 >
 > --
 > Sebastián Ramírez Magrí



--
Sebastián Ramírez Magrí