Re: KafkaConsumer keeps getting InstanceAlreadyExistsException

2020-03-18 Thread Rong Rong
Hi Becket/Till,

Thanks for the detail explanation. Just to confirm:
the issue in FLINK-8093 refers to multiple Kafka consumer within the same
TM - thus the fix should be to make consumer client.id unique for different
tasks ?
and the issue here is an issue internal to the Kafka consumer, where both
the polling consumer thread and the MBean JMX reporter thread share the
same client.id - thus we should fix this in the Kafka level?

If this is the correct understanding, I think we should separate them since
they are in fact 2 different issues.

--
Rong

On Tue, Mar 17, 2020 at 3:36 AM Becket Qin  wrote:

> Actually it might be better to create another ticket, FLINK-8093 was
> mainly complaining about the JMX bean collision when there are multiple
> tasks running in the same TM.
>
> Jiangjie (Becket) Qin
>
> On Tue, Mar 17, 2020 at 6:33 PM Becket Qin  wrote:
>
>> Hi Till,
>>
>> It looks FLINK-8093 <https://issues.apache.org/jira/browse/FLINK-8093> 
>> reports
>> the same issue, although the reported information is not exactly correct,
>> as this should not cause the producer to fail. I'll take care of the ticket.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Tue, Mar 17, 2020 at 6:00 PM Till Rohrmann 
>> wrote:
>>
>>> @Becket do we already have a JIRA ticket to track this effort?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Mon, Mar 16, 2020 at 4:07 AM Becket Qin  wrote:
>>>
>>>> Hi Sidney,
>>>>
>>>> The WARN logging you saw was from the AbstractPartitionDiscoverer which
>>>> is created by FlinkKafkaConsumer itself. It has an internal consumer which
>>>> shares the client.id of the actual consumer fetching data. This is a
>>>> bug that we should fix.
>>>>
>>>> As Rong said, this won't affect the normal operation of the consumer.
>>>> It is just an AppInfo MBean for reporting some information. There might be
>>>> some slight impact on the accuracy of the consumer metrics, but should be
>>>> almost ignorable because the partition discoverer is quite inactive
>>>> compared with the actual consumer.
>>>>
>>>> Thanks,
>>>>
>>>> Jiangjie (Becket) Qin
>>>>
>>>> On Mon, Mar 16, 2020 at 12:44 AM Rong Rong  wrote:
>>>>
>>>>> We also had seen this issue before running Flink apps in a shared
>>>>> cluster environment.
>>>>>
>>>>> Basically, Kafka is trying to register a JMX MBean[1] for application
>>>>> monitoring.
>>>>> This is only a WARN suggesting that you are registering more than one
>>>>> MBean with the same client id "consumer-1", it should not affect your
>>>>> normal application behavior.
>>>>>
>>>>> This is most likely occurring if you have more than one Kafka consumer
>>>>> within the same JVM, are you using a session cluster[2]? can you share 
>>>>> more
>>>>> on your application configuration including parallelism and slot configs?
>>>>> Also based on the log, you are not configuring the "client.id"
>>>>> correctly. which config key are you using? could you also share your fill
>>>>> Kafka properties map?
>>>>>
>>>>>
>>>>> --
>>>>> Rong
>>>>>
>>>>> [1] https://docs.oracle.com/javase/tutorial/jmx/mbeans/standard.html
>>>>> [2]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html#flink-yarn-session
>>>>>
>>>>> On Sun, Mar 15, 2020 at 8:28 AM Sidney Feiner <
>>>>> sidney.fei...@startapp.com> wrote:
>>>>>
>>>>>> Hey,
>>>>>> I've been using Flink for a while now without any problems when
>>>>>> running apps with a FlinkKafkaConsumer.
>>>>>> All my apps have the same overall logic (consume from kafka ->
>>>>>> transform event -> write to file) and the only way they differ from each
>>>>>> other is the topic they read (remaining kafka config remains identical) 
>>>>>> and
>>>>>> the way they transform the event.
>>>>>> But suddenly, I've been starting to get the following error:
>>>>>>
>>>>>>
>>>>>> 2020-03-15 12:13:56,911 WARN
>>>>>&

Re: KafkaConsumer keeps getting InstanceAlreadyExistsException

2020-03-15 Thread Rong Rong
We also had seen this issue before running Flink apps in a shared cluster
environment.

Basically, Kafka is trying to register a JMX MBean[1] for application
monitoring.
This is only a WARN suggesting that you are registering more than one MBean
with the same client id "consumer-1", it should not affect your normal
application behavior.

This is most likely occurring if you have more than one Kafka consumer
within the same JVM, are you using a session cluster[2]? can you share more
on your application configuration including parallelism and slot configs?
Also based on the log, you are not configuring the "client.id" correctly.
which config key are you using? could you also share your fill Kafka
properties map?


--
Rong

[1] https://docs.oracle.com/javase/tutorial/jmx/mbeans/standard.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/yarn_setup.html#flink-yarn-session

On Sun, Mar 15, 2020 at 8:28 AM Sidney Feiner 
wrote:

> Hey,
> I've been using Flink for a while now without any problems when running
> apps with a FlinkKafkaConsumer.
> All my apps have the same overall logic (consume from kafka -> transform
> event -> write to file) and the only way they differ from each other is the
> topic they read (remaining kafka config remains identical) and the way they
> transform the event.
> But suddenly, I've been starting to get the following error:
>
>
> 2020-03-15 12:13:56,911 WARN  org.apache.kafka.common.utils.AppInfoParser
>   - Error registering AppInfo mbean
> javax.management.InstanceAlreadyExistsException:
> kafka.consumer:type=app-info,id=consumer-1
>at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>
>at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>
>at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>
>at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>
>at
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>
>at
> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
>
>at
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:805)
>
>at
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:659)
>
>at
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:639)
>
>at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
>
>at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
>
>at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:505)
>
>at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>
>at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>
>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:552)
>
>at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:416)
>
>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>at java.lang.Thread.run(Thread.java:748)
>
>
> I've tried setting the "client.id" on my consumer to a random UUID,
> making sure I don't have any duplicates but that didn't help either.
> Any idea what could be causing this?
>
> Thanks 
>
> *Sidney Feiner* */* Data Platform Developer
> M: +972.528197720 */* Skype: sidney.feiner.startapp
>
> [image: emailsignature]
>
>


Flink Kafka consumer auto-commit timeout

2020-03-08 Thread Rong Rong
Hi All,

I would like to bring back this discussion which I saw multiple times in
previous ML threads [1], but there seem to have no solution if
checkpointing is disabled.

All of these ML reported exceptions have one common pattern:

> *INFO* org.apache.kafka.clients.consumer.internals.AbstractCoordinator  -
> Marking the coordinator kafka[xxx]:9092 (id: ??? rack: ???) dead for
> group consumergroup[xxx]
> *WARN* org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  
> -Auto-commit
> of offsets {topic[xxx]=OffsetAndMetadata{offset=???,metadata=???''}}
> failed for group consumergroup[xxx]: Offset commit failed with a retriable
> exception. You should retry committing offsets. The underlying error was:
> The request timed out.

In most of the cases enabling *OffsetCommitMode.ON_CHECKPOINTS* fixes the
issue - Flink Kafka consumer will explicitly commit offset when
checkpointing and that goes down completely different code path comparing
with enabling Kafka consumer option *enable.auto.commit* and let Kafka
consumer handles it.

That brings me to the question:
- Is this feature (disabling checkpoint and restarting job from Kafka
committed GROUP_OFFSET) not supported?
- How does Flink-Kafka actually handles auto-commit to coordinator given
the fact that Flink ignores the coordinator assignments and uses
self-assigning partitions instead?


A bit of our observation:
We had conducted some experiments when option *enable.auto.commit* is set
to true, with Kafka 011 on both Flink 1.4 and 1.6, and observed that the
behavior is extremely weird after the above warning were seen:
- the task manager metrics
**.Source.KafkaConsumer.current-offsets.topic-[xxx]-partition-[yyy]* is
moving forward, tracking the latest Kafka broker offset - indicating that
the message consumption thread is executing without any issue.
- the task manager metrics
**.Source.KafkaConsumer.committed-offsets.topic-[xxx]-partition-[yyy]* is
stuck indefinitely - indicating that it has stopped talking to the
coordinator.

We would try to experiment this with Flink 1.10 later, but has anyone
experiencing similar issues with later Flink releases as well?

Thanks,
Rong

--
[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-offset-auto-commit-stops-after-timeout-td18696.html
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-kafka-consumer-stopped-committing-offsets-td20624.html
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Odd-job-failure-td19845.html
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-during-Kafka-connection-td14822.html
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Logs-show-Marking-the-coordinator-2147483637-dead-in-Flink-Kafka-conn-td7396.html
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Checkpoint-takes-long-with-FlinkKafkaConsumer-td7561.html


Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer

2020-02-21 Thread Rong Rong
Congratulations Jingsong!!

Cheers,
Rong

On Fri, Feb 21, 2020 at 8:45 AM Bowen Li  wrote:

> Congrats, Jingsong!
>
> On Fri, Feb 21, 2020 at 7:28 AM Till Rohrmann 
> wrote:
>
>> Congratulations Jingsong!
>>
>> Cheers,
>> Till
>>
>> On Fri, Feb 21, 2020 at 4:03 PM Yun Gao  wrote:
>>
>>>   Congratulations Jingsong!
>>>
>>>Best,
>>>Yun
>>>
>>> --
>>> From:Jingsong Li 
>>> Send Time:2020 Feb. 21 (Fri.) 21:42
>>> To:Hequn Cheng 
>>> Cc:Yang Wang ; Zhijiang <
>>> wangzhijiang...@aliyun.com>; Zhenghua Gao ; godfrey
>>> he ; dev ; user <
>>> user@flink.apache.org>
>>> Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer
>>>
>>> Thanks everyone~
>>>
>>> It's my pleasure to be part of the community. I hope I can make a better
>>> contribution in future.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Fri, Feb 21, 2020 at 2:48 PM Hequn Cheng  wrote:
>>> Congratulations Jingsong! Well deserved.
>>>
>>> Best,
>>> Hequn
>>>
>>> On Fri, Feb 21, 2020 at 2:42 PM Yang Wang  wrote:
>>> Congratulations!Jingsong. Well deserved.
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Zhijiang  于2020年2月21日周五 下午1:18写道:
>>> Congrats Jingsong! Welcome on board!
>>>
>>> Best,
>>> Zhijiang
>>>
>>> --
>>> From:Zhenghua Gao 
>>> Send Time:2020 Feb. 21 (Fri.) 12:49
>>> To:godfrey he 
>>> Cc:dev ; user 
>>> Subject:Re: [ANNOUNCE] Jingsong Lee becomes a Flink committer
>>>
>>> Congrats Jingsong!
>>>
>>>
>>> *Best Regards,*
>>> *Zhenghua Gao*
>>>
>>>
>>> On Fri, Feb 21, 2020 at 11:59 AM godfrey he  wrote:
>>> Congrats Jingsong! Well deserved.
>>>
>>> Best,
>>> godfrey
>>>
>>> Jeff Zhang  于2020年2月21日周五 上午11:49写道:
>>> Congratulations!Jingsong. You deserve it
>>>
>>> wenlong.lwl  于2020年2月21日周五 上午11:43写道:
>>> Congrats Jingsong!
>>>
>>> On Fri, 21 Feb 2020 at 11:41, Dian Fu  wrote:
>>>
>>> > Congrats Jingsong!
>>> >
>>> > > 在 2020年2月21日,上午11:39,Jark Wu  写道:
>>> > >
>>> > > Congratulations Jingsong! Well deserved.
>>> > >
>>> > > Best,
>>> > > Jark
>>> > >
>>> > > On Fri, 21 Feb 2020 at 11:32, zoudan  wrote:
>>> > >
>>> > >> Congratulations! Jingsong
>>> > >>
>>> > >>
>>> > >> Best,
>>> > >> Dan Zou
>>> > >>
>>> >
>>> >
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>>
>>>


Re: [ANNOUNCE] Apache Flink 1.10.0 released

2020-02-12 Thread Rong Rong
Congratulations, a big thanks to the release managers for all the hard
works!!

--
Rong

On Wed, Feb 12, 2020 at 5:52 PM Yang Wang  wrote:

> Excellent work. Thanks Gary & Yu for being the release manager.
>
>
> Best,
> Yang
>
> Jeff Zhang  于2020年2月13日周四 上午9:36写道:
>
>> Congratulations! Really appreciated your hard work.
>>
>> Yangze Guo  于2020年2月13日周四 上午9:29写道:
>>
>>> Thanks, Gary & Yu. Congrats to everyone involved!
>>>
>>> Best,
>>> Yangze Guo
>>>
>>> On Thu, Feb 13, 2020 at 9:23 AM Jingsong Li 
>>> wrote:
>>> >
>>> > Congratulations! Great work.
>>> >
>>> > Best,
>>> > Jingsong Lee
>>> >
>>> > On Wed, Feb 12, 2020 at 11:05 PM Leonard Xu  wrote:
>>> >>
>>> >> Great news!
>>> >> Thanks everyone involved !
>>> >> Thanks Gary and Yu for being the release manager !
>>> >>
>>> >> Best,
>>> >> Leonard Xu
>>> >>
>>> >> 在 2020年2月12日,23:02,Stephan Ewen  写道:
>>> >>
>>> >> Congrats to us all.
>>> >>
>>> >> A big piece of work, nicely done.
>>> >>
>>> >> Let's hope that this helps our users make their existing use cases
>>> easier and also opens up new use cases.
>>> >>
>>> >> On Wed, Feb 12, 2020 at 3:31 PM 张光辉  wrote:
>>> >>>
>>> >>> Greet work.
>>> >>>
>>> >>> Congxian Qiu  于2020年2月12日周三 下午10:11写道:
>>> 
>>>  Great work.
>>>  Thanks everyone involved.
>>>  Thanks Gary and Yu for being the release manager
>>> 
>>> 
>>>  Best,
>>>  Congxian
>>> 
>>> 
>>>  Jark Wu  于2020年2月12日周三 下午9:46写道:
>>> >
>>> > Congratulations to everyone involved!
>>> > Great thanks to Yu & Gary for being the release manager!
>>> >
>>> > Best,
>>> > Jark
>>> >
>>> > On Wed, 12 Feb 2020 at 21:42, Zhu Zhu  wrote:
>>> >>
>>> >> Cheers!
>>> >> Thanks Gary and Yu for the great job as release managers.
>>> >> And thanks to everyone whose contribution makes the release
>>> possible!
>>> >>
>>> >> Thanks,
>>> >> Zhu Zhu
>>> >>
>>> >> Wyatt Chun  于2020年2月12日周三 下午9:36写道:
>>> >>>
>>> >>> Sounds great. Congrats & Thanks!
>>> >>>
>>> >>> On Wed, Feb 12, 2020 at 9:31 PM Yu Li  wrote:
>>> 
>>>  The Apache Flink community is very happy to announce the
>>> release of Apache Flink 1.10.0, which is the latest major release.
>>> 
>>>  Apache Flink® is an open-source stream processing framework for
>>> distributed, high-performing, always-available, and accurate data streaming
>>> applications.
>>> 
>>>  The release is available for download at:
>>>  https://flink.apache.org/downloads.html
>>> 
>>>  Please check out the release blog post for an overview of the
>>> improvements for this new major release:
>>>  https://flink.apache.org/news/2020/02/11/release-1.10.0.html
>>> 
>>>  The full release notes are available in Jira:
>>> 
>>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12345845
>>> 
>>>  We would like to thank all contributors of the Apache Flink
>>> community who made this release possible!
>>> 
>>>  Cheers,
>>>  Gary & Yu
>>> >>
>>> >>
>>> >
>>> >
>>> > --
>>> > Best, Jingsong Lee
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>


Re: Yarn Kerberos issue

2020-01-13 Thread Rong Rong
Hi Juan,

Sorry I think I hit send button too soon as well :-) There's 2nd part of
the analysis which was already captured in FLINK-15561 but not sent:

> *It seems like the delegation token checker is imposed in
> YarnClusterDescriptor [1], but not in HadoopModule [2].*


in regards to your comment, you are probably right, I am no expert in how
DT works, just merely pointing out the difference. It might not be related.
However, I think the security HadoopModule is both used on the client side
as well as the job manager / task manager side: see: [3,4] so I think they
go down the same code path.

Regarding the DT renewal: you are right on both end: there's less reason to
run renewal thread on the client side, and I think most of the case renewal
doesn't matter since in Spark's scenario: DT is valid for 7 days (maximum)
and renewal is only required every 24 hours.

I will try to follow up on the ticket and see if I can find more DT related
issues.
If you have more comments or information, could you also please comment on
the JIRA ticket as well?


Thanks,
Rong

[1]
https://github.com/apache/flink/blob/master/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java#L484
<https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java#L146>
[2]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java#L146
[3]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java#L166-L169
[4]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java#L321-L325

On Mon, Jan 13, 2020 at 1:24 AM Juan Gentile  wrote:

> Thank you Rong
>
>
>
> We believe that the job (or scheduler) launching Flink should be the one
> responsible for renewing the DT. Here is some documentation that could be
> useful regarding Spark
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/security/README.md#dt-renewal-renewers-and-yarn
>
>
>
> If you think that you need more information about our issue, we can
> organize a call and discuss about it.
>
>
>
> Regards,
>
> Juan
>
>
>
> *From: *Rong Rong 
> *Date: *Sunday, January 12, 2020 at 6:13 PM
> *To: *Juan Gentile 
> *Cc: *Aljoscha Krettek , "user@flink.apache.org" <
> user@flink.apache.org>, Arnaud Dufranne ,
> Oleksandr Nitavskyi 
> *Subject: *Re: Yarn Kerberos issue
>
>
>
> Hi Juan,
>
>
>
> I have some time to dig deeper into the code, It seems like
> the HADOOP_TOKEN_FILE_LOCATION is actually a static environment variable
> field that the UserGroupInformation will read.
>
> Interestingly Flink's Hadoop security module actually treats it
> differently depending on whether this is set, see [1]. This is the major
> difference from how UserGroupInformation handles credential [2]
>
>
>
> We may just need to spawn a thread to periodically renew the delegation
> token. I already filed a JIRA ticket for tracking this [3]
>
>
>
> --
>
> Thanks,
>
> Rong
>
>
>
>
>
> [1]
> https://github.com/apache/flink/blob/release-1.9/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java#L84
> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2Frelease-1.9%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2Fmodules%2FHadoopModule.java%23L84=02%7C01%7Cj.gentile%40criteo.com%7C68ddc63f8cf34f64e76808d79782af30%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637144459852248038=nIaegm2q%2BVpXt%2F21Flwv6d1YUZFJkuTnEnshvoSTMYc%3D=0>
>
> [2]
> https://github.com/hanborq/hadoop/blob/master/src/core/org/apache/hadoop/security/UserGroupInformation.java#L538
> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fhanborq%2Fhadoop%2Fblob%2Fmaster%2Fsrc%2Fcore%2Forg%2Fapache%2Fhadoop%2Fsecurity%2FUserGroupInformation.java%23L538=02%7C01%7Cj.gentile%40criteo.com%7C68ddc63f8cf34f64e76808d79782af30%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637144459852248038=JgHfZRGuGaHp2GJA509iTZ46QCFfr0mZvRp5D2kbfMw%3D=0>
>
> [3] https://issues.apache.org/jira/browse/FLINK-15561
> <https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-15561=02%7C01%7Cj.gentile%40criteo.com%7C68ddc63f8cf34f64e76808d79782af30%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637144459852258037=VXD5q%2F6%2FdNKBAMkH5zHIQpKfE%2F021YQJw1tvjvPJK64%3D=0>
>
>
>
> On Fri, Jan 10, 2020 at 8:19 AM Juan Gentile  wrote:
>
> The error we get is the following:
>
> org.apache.f

Re: Yarn Kerberos issue

2020-01-12 Thread Rong Rong
;
> >
> >
> >  Best,
> >
> >  Aljoscha
> >
> >
> >
> >  [1]
> >
> >
> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F64e2f27640946bf3b1608d4d85585fe18891dcee%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2Fmodules%2FHadoopModule.java%23L68data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087617028sdata=3FKN2UWzWr1K14klFN1Su2hKvhi8%2BADj4GCwdTDnKQU%3Dreserved=0
> >
> >  [2]
> >
> >
> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F2bbf4e5002d7657018ba0b53b7a3ed8ee3124da8%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2FSecurityUtils.java%23L89data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978sdata=to%2FsDgs1JlTASVg76OmxRD4dVPuSuwZC5jBpPms8hFY%3Dreserved=0
> >
> >
> >
> >  On 10.01.20 15:02, Aljoscha Krettek wrote:
> >
> >  > Hi Juan,
> >
> >  >
> >
> >  > to summarize and clarify various emails: currently, you can
> only use
> >
> >  > Kerberos authentication via tickets (i.e. kinit) or keytab.
> The relevant
> >
> >  > bit of code is in the Hadoop security module: [1]. Here you
> can see that
> >
> >  > we either use keytab.
> >
> >  >
> >
> >  > I think we should be able to extend this to also work with
> delegation
> >
> >  > tokens (DTs). In Spark, how do you pass the DTs to the system
> as a user?
> >
> >  >
> >
> >  > Best,
> >
> >  > Aljoscha
> >
> >  >
> >
> >  > [1]
> >
> >  >
> https://eur03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F64e2f27640946bf3b1608d4d85585fe18891dcee%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fsecurity%2Fmodules%2FHadoopModule.java%23L68data=02%7C01%7Cj.gentile%40criteo.com%7C812d636410c8447bf75608d795da85b1%7C2a35d8fd574d48e3927c8c398e225a01%7C1%7C0%7C637142638087626978sdata=T9Oy6ZwcDyH2mraQqvYXusan271romFT2tEjQRTk%2FVc%3Dreserved=0
> >
> >  >
> >
> >  >
> >
> >  > On 06.01.20 09:52, Yang Wang wrote:
> >
> >  >> I guess you have set some kerberos related configuration in
> spark
> >
> >  >> jobs. For
> >
> >  >> Flink, you need
> >
> >  >> to do this too by the following configs. And the keytab file
> should
> >
> >  >> existed
> >
> >  >> on Flink client. In your
> >
> >  >> environment, it means the scheduler(oozie) could access the
> keytab file.
> >
> >  >>
> >
> >  >> security.kerberos.login.keytab
> >
> >  >> security.kerberos.login.principal
> >
> >  >> security.kerberos.login.contexts
> >
> >  >>
> >
> >  >>
> >
> >  >>
> >
> >  >> Best,
> >
> >  >> Yang
> >
> >  >>
> >
> >  >> Juan Gentile  于2020年1月6日周一 下午3:55写道:
> >
> >  >>
> >
> >  >>> Hello Rong, Chesnay,
> >
> >  >>>
> >
>     >  >>>
> >
> >  >>>
> >
> >  >>> Thank you for your answer, the way we are trying to launch
> the job is
> >
> >  >>> through a scheduler (similar to oozie) where we have a
> keytab for the
> >
> >  >>> scheduler user and with that keytab we get delegation tokens
> >
> >  >>> impersonating
> >
> >  >>> the right user (owner of the job). But the only way I was
> able to
> >
> >  >>> make this
> >
> >  >>> work is by getting a ticket (through kinit).
> >
&

Re: Completed job wasn't saved to archive

2020-01-09 Thread Rong Rong
Hi Pavel,

Sorry for bringing this thread up so late. I was digging into the usage of
the Flink history server and I found one situation where there would be no
logs and no failure/success message from the cluster:
In very rare case in our Flink-YARN session cluster: if an application
master (JobManager running container) fails and being restarted as a YARN
2nd attempt (we haven't enable HA) - then there will be no logs of
archiving being logged whatsoever. However in this case the there would be
a completely new AM container brought up running the JM again (e.g. new log
files)

I am not exactly sure whether this suites your scenarios. Could you
describe a bit more on how your cluster was configured?

Thanks,
Rong

On Mon, Nov 25, 2019 at 10:49 AM Chesnay Schepler 
wrote:

> I'm afraid I can't think of a solution. I don't see a way how this
> operation can succeed or fail without anything being logged.
>
> Is the cluster behaving normally afterwards? Could you check whether the 
> numRunningJobs
> ticks down properly after the job was canceled?
>
>
> On 22/11/2019 13:27, Pavel Potseluev wrote:
>
> Hi Chesnay,
>
> We archive jobs on s3 file system. We don't configure a throttling for
> write operations and afaik it isn't possible now and will be implemented in
> FLINK-13251 . And
> other write operations (like checkpoints saving) work fine. But I don't see
> archived job or message about archiving failure at all. It looks like Flink
> just didn't try to save job to archive.
>
> 21.11.2019, 17:17, "Chesnay Schepler" 
> :
>
> If the archiving fails there should be some log message, like "Failed to
> archive job" or "Could not archive completed job..." .
> If nothing of the sort is logged my first instinct would be that the
> operation is being slowed down, _a lot_.
>
> Where are you archiving them to? Could it be the write operation is being
> throttled heavily?
>
> On 21/11/2019 13:48, Pavel Potseluev wrote:
>
> Hi Vino,
>
> Usually Flink archives jobs correctly and the problem is rarely
> reproduced. So I think it isn't a problem with configuration.
>
> Job Manager log when job 5ec264a20bb5005cdbd8e23a5e59f136 was canceled:
>
> 771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:52:13.294 [Checkpoint
> Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  -
> Triggering checkpoint 1872 @ 1574092333218 for job
> 5ec264a20bb5005cdbd8e23a5e59f136.
>
> 771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:52:37.260
> [flink-akka.actor.default-dispatcher-30] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator  - Completed
> checkpoint 1872 for job 5ec264a20bb5005cdbd8e23a5e59f136 (568048140 bytes
> in 23541 ms).
>
> 771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:13.314 [Checkpoint
> Timer] INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator  -
> Triggering checkpoint 1873 @ 1574092393218 for job
> 5ec264a20bb5005cdbd8e23a5e59f136.
>
> 771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.279
> [flink-akka.actor.default-dispatcher-40] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job
> bureau-user-offers-statistics-AUTORU-USERS_AUTORU
> (5ec264a20bb5005cdbd8e23a5e59f136) switched from state RUNNING to
> CANCELLING.
>
> 771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.279
> [flink-akka.actor.default-dispatcher-40] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom
> File Source (1/1) (934d89cf3d7999b40225dd8009b5493c) switched from RUNNING
> to CANCELING.
>
> 771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.280
> [flink-akka.actor.default-dispatcher-40] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
> kafka-source-moderation-update-journal-autoru -> Filter -> Flat Map (1/2)
> (47656a3c4fc70e19622acca31267e41f) switched from RUNNING to CANCELING.
>
> 771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.280
> [flink-akka.actor.default-dispatcher-40] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source:
> kafka-source-moderation-update-journal-autoru -> Filter -> Flat Map (2/2)
> (be3c4562e65d3d6bdfda4f1632017c6c) switched from RUNNING to CANCELING.
>
> 771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.280
> [flink-akka.actor.default-dispatcher-40] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph  -
> user-offers-statistics-init-from-file -> Map (1/2)
> (4a45ed43b05e4d444e190a44b33514ac) switched from RUNNING to CANCELING.
>
> 771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.280
> [flink-akka.actor.default-dispatcher-40] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph  -
> user-offers-statistics-init-from-file -> Map (2/2)
> (bb3be311c5e53abaedb06b4d0148c23f) switched from RUNNING to CANCELING.
>
> 771a4992-d694-d2a4-b49a-d4eb382086e5 2019-11-18 18:53:19.280
> [flink-akka.actor.default-dispatcher-40] INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph  - 

Re: Yarn Kerberos issue

2020-01-04 Thread Rong Rong
Hi Juan,

Chesnay was right. If you are using CLI to launch your session cluster
based on the document [1], you following the instruction to use kinit [2]
first seems to be one of the right way to go.
Another way of approaching it is to setup the kerberos settings in the
flink-conf.yaml file [3]. FlinkYarnSessionCli will be able to pick up your
keytab files and run the CLI securely.

As far as I know the option `security.kerberos.login.use-ticket-cache`
doesn't actually change the behavior of the authentication process, it is
more of a hint whether to use the ticket cache instantiated by `kinit`. If
you disable using the ticket cache, you will have to use the
"keytab/principle" approach - this doc [4] might be helpful to explain
better.

Thanks,
Rong


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_setup.html#start-flink-session
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-kerberos.html#using-kinit-yarn-only
[3]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-kerberos.html#yarnmesos-mode
[4]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#enabling-kerberos-authentication-for-versions-09-and-above-only

On Fri, Jan 3, 2020 at 7:20 AM Chesnay Schepler  wrote:

> From what I understand from the documentation, if you want to use
> delegation tokens you always first have to issue a ticket using kinit; so
> you did everything correctly?
>
> On 02/01/2020 13:00, Juan Gentile wrote:
>
> Hello,
>
>
>
> Im trying to submit a job (batch worcount) to a Yarn cluster. I’m trying
> to use delegation tokens and I’m getting the following error:
>
> *org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't
> deploy Yarn session cluster*
>
> *   at
> org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:423)*
>
> *   at
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:262)*
>
> *   at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)*
>
> *   at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)*
>
> *   at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)*
>
> *   at java.security.AccessController.doPrivileged(Native
> Method)*
>
> *   at javax.security.auth.Subject.doAs(Subject.java:422)*
>
> *   at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)*
>
> *   at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)*
>
> *   at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)*
>
> *Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException):
> Delegation Token can be issued only with kerberos or web authentication*
>
> *   at
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getDelegationToken(FSNamesystem.java:7560)*
>
> *   at
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getDelegationToken(NameNodeRpcServer.java:548)*
>
> *   at
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getDelegationToken(AuthorizationProviderProxyClientProtocol.java:663)*
>
> *   at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getDelegationToken(ClientNamenodeProtocolServerSideTranslatorPB.java:981)*
>
> *   at
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)*
>
> *   at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)*
>
> *   at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1073)*
>
> *   at
> org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2221)*
>
> *   at
> org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2217)*
>
> *   at java.security.AccessController.doPrivileged(Native
> Method)*
>
> *   at javax.security.auth.Subject.doAs(Subject.java:422)*
>
> *   at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1920)*
>
> *   at
> org.apache.hadoop.ipc.Server$Handler.run(Server.java:2215) *
>
> *at org.apache.hadoop.ipc.Client.call(Client.java:1472)*
>
> *   at org.apache.hadoop.ipc.Client.call(Client.java:1409)*
>
> *   at
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230)*
>
> *   at com.sun.proxy.$Proxy18.getDelegationToken(Unknown
> Source)*
>
> *   at
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getDelegationToken(ClientNamenodeProtocolTranslatorPB.java:928)*
>
> *   at 

Re: Flink ML feature

2019-12-12 Thread Rong Rong
Hi guys,

Yes, as Till mentioned. The community is working on a new ML library and we
are working closely with the Alink project to bring the algorithms.

You can find more information regarding the new ML design architecture in
FLIP-39 [1].
One of the major change is that the new ML library [2] will be based on the
Table API [3], instead of depending on the dataset/datastream API.

I've cc-ed @Xu Yang , who has been a major
contributor to the Alink project to provide more information.

--
Rong

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-39+Flink+ML+pipeline+and+ML+libs
[2] https://github.com/apache/flink/tree/master/flink-ml-parent
[3]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tableApi.html

On Wed, Dec 11, 2019 at 1:11 AM Till Rohrmann  wrote:

> Hi guys,
>
> it is true that we dropped Flink-ML with 1.9. The reason is that the
> community started working on a new ML library which you can find under
> flink-ml-parent [1]. This module contains the framework for building ML
> pipelines but not yet too many algorithms iirc. The plan is to extend this
> library with algorithms from Alink in the near future to grow Flink's
> machine learning library.
>
> [1] https://github.com/apache/flink/tree/master/flink-ml-parent
>
> Cheers,
> Till
>
> On Wed, Dec 11, 2019 at 3:42 AM vino yang  wrote:
>
>> Hi Benoit,
>>
>> I can only try to ping @Till Rohrmann  @Kurt Young
>>   who may know more information to answer this
>> question.
>>
>> Best,
>> Vino
>>
>> Benoît Paris  于2019年12月10日周二
>> 下午7:06写道:
>>
>>> Is there any information as to whether Alink is going to be contributed
>>> to Apache Flink as the official ML Lib?
>>>
>>>
>>> On Tue, Dec 10, 2019 at 7:11 AM vino yang  wrote:
>>>
 Hi Chandu,

 AFAIK, there is a project named Alink[1] which is the Machine Learning
 algorithm platform based on Flink, developed by the PAI team of Alibaba
 computing platform. FYI

 Best,
 Vino

 [1]: https://github.com/alibaba/Alink

 Tom Blackwood  于2019年12月10日周二 下午2:07写道:

> You may try Spark ML, which is a production ready library for ML stuff.
>
> regards.
>
> On Tue, Dec 10, 2019 at 1:04 PM chandu soa 
> wrote:
>
>> Hello Community,
>>
>> Can you please give me some pointers for implementing Machine
>> Learning using Flink.
>>
>> I see Flink ML libraries were dropped in v1.9. It looks like ML
>> feature in Flink going to be enhanced.
>>
>> What is the recommended approach for implementing production grade ML
>> based apps using Flink? v1.9 is ok?or should wait for 1.10?
>>
>> Thanks,
>> Chandu
>>
>
>>>
>>> --
>>> Benoît Paris
>>> Ingénieur Machine Learning Explicable
>>> Tél : +33 6 60 74 23 00
>>> http://benoit.paris
>>> http://explicable.ml
>>>
>>


Re: Apache Flink - Throttling stream flow

2019-11-27 Thread Rong Rong
Hi Mans,

is this what you are looking for [1][2]?

--
Rong

[1] https://issues.apache.org/jira/browse/FLINK-11501
[2] https://github.com/apache/flink/pull/7679

On Mon, Nov 25, 2019 at 3:29 AM M Singh  wrote:

> Thanks Ciazhi & Thomas for your responses.
>
> I read the throttling example but want to see if that work with a
> distributed broker like Kinesis and how to have throttling feedback to the
> Kinesis source so that it can vary the rate without interfering with
> watermarks, etc.
>
> Thanks again
>
> Mans
>
>
> On Monday, November 25, 2019, 05:55:21 AM EST, Thomas Julian <
> thomasjul...@zoho.com> wrote:
>
>
> related
>
> https://issues.apache.org/jira/browse/FLINK-13792
>
> Regards,
> Julian.
>
>
>  On Mon, 25 Nov 2019 15:25:14 +0530 *Caizhi Weng
> >* wrote 
>
> Hi,
>
> As far as I know, Flink currently doesn't have a built-in throttling
> function. You can write your own user-defined function to achieve this.
> Your function just gives out what it reads in and limits the speed it gives
> out records at the same time.
>
> If you're not familiar with user-defined functions, see
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html
>
> Here is a throttling iterator example:
> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/utils/ThrottledIterator.java
>
> M Singh  于2019年11月25日周一 上午5:50写道:
>
> Hi:
>
> I have an Flink streaming application that invokes  some other web
> services.  However the webservices have limited throughput.  So I wanted to
> find out if there any recommendations on how to throttle the Flink
> datastream so that they don't overload the downstrream services.  I am
> using Kinesis as source and sink in my application.
>
> Please let me know if there any hooks available in Flink, what are the
> patterns that can be used and what are the best practices/pitfalls for
> using them.
>
> Thanks
>
> Mans
>
>
>
>


Re: [DISCUSS] Support configure remote flink jar

2019-11-23 Thread Rong Rong
Thanks @Tison for starting the discussion and sorry for joining so late.

Yes, I think this is a very good idea. we already tweak the flink-yarn
package internally to support something similar to what @Thomas mentioned:
to support registering a Jar that has already uploaded to some DFS
(needless to be the Yarn public cache discussed in FLINK-13938).
The reason is that: we provide our internal packaged extension libraries
for our customers. And we've seen good performance improvement in our YARN
cluster during container localization phase after our customer switch to
use pre-uploaded JARs instead of having to upload every time during
deployment.

Looking forward for this feature!

--
Rong


On Tue, Nov 19, 2019 at 10:19 PM tison  wrote:

> Thanks for your participation!
>
> @Yang: Great to hear. I'd like to know whether or not a remote flink jar
> path conflicts with FLINK-13938. IIRC FLINK-13938 auto excludes local
> flink jar from shipping which possibly not works for the remote one.
>
> @Thomas: It inspires a lot URL becomes the unified representation of
> resource. I'm thinking of how to serve a unique process getting resource
> from URL which points to an artifact or distributed file system.
>
> @ouywl & Stephan: Yes this improvement can be migrated to environment like
> k8s, IIRC the k8s proposal already discussed about improvement using "init
> container" and other technologies. However, so far I regard it is an
> improvement different from one storage to another so that we achieve then
> individually.
>
>
> Best,
> tison.
>
>
> Stephan Ewen  于2019年11月20日周三 上午12:34写道:
>
>> Would that be a feature specific to Yarn? (and maybe standalone sessions)
>>
>> For containerized setups, and init container seems like a nice way to
>> solve this. Also more flexible, when it comes to supporting authentication
>> mechanisms for the target storage system, etc.
>>
>> On Tue, Nov 19, 2019 at 5:29 PM ouywl  wrote:
>>
>>> I have implemented this feature in our env, Use ‘Init Container’ of
>>> docker to get URL of a jar file ,It seems a good idea.
>>>
>>> ouywl
>>> ou...@139.com
>>>
>>> 
>>> 签名由 网易邮箱大师  定制
>>>
>>> On 11/19/2019 12:11,Thomas Weise 
>>> wrote:
>>>
>>> There is a related use case (not specific to HDFS) that I came across:
>>>
>>> It would be nice if the jar upload endpoint could accept the URL of a
>>> jar file as alternative to the jar file itself. Such URL could point to an
>>> artifactory or distributed file system.
>>>
>>> Thomas
>>>
>>>
>>> On Mon, Nov 18, 2019 at 7:40 PM Yang Wang  wrote:
>>>
 Hi tison,

 Thanks for your starting this discussion.
 * For user customized flink-dist jar, it is an useful feature. Since it
 could avoid to upload the flink-dist jar
 every time. Especially in production environment, it could accelerate
 the
 submission process.
 * For the standard flink-dist jar, FLINK-13938[1] could solve
 the problem.Upload a official flink release
 binary to distributed storage(hdfs) first, and then all the submission
 could benefit from it. Users could
 also upload the customized flink-dist jar to accelerate their
 submission.

 If the flink-dist jar could be specified to a remote path, maybe the
 user
 jar have the same situation.

 [1]. https://issues.apache.org/jira/browse/FLINK-13938

 tison  于2019年11月19日周二 上午11:17写道:

 > Hi forks,
 >
 > Recently, our customers ask for a feature configuring remote flink
 jar.
 > I'd like to reach to you guys
 > to see whether or not it is a general need.
 >
 > ATM Flink only supports configures local file as flink jar via `-yj`
 > option. If we pass a HDFS file
 > path, due to implementation detail it will fail with
 > IllegalArgumentException. In the story we support
 > configure remote flink jar, this limitation is eliminated. We also
 make
 > use of YARN locality so that
 > reducing uploading overhead, instead, asking YARN to localize the jar
 on
 > AM container started.
 >
 > Besides, it possibly has overlap with FLINK-13938. I'd like to put the
 > discussion on our
 > mailing list first.
 >
 > Are you looking forward to such a feature?
 >
 > @Yang Wang: this feature is different from that we discussed offline,
 it
 > only focuses on flink jar, not
 > all ship files.
 >
 > Best,
 > tison.
 >

>>>


Re: Limit max cpu usage per TaskManager

2019-11-09 Thread Rong Rong
 Hi Lu,

Yang is right. enabling cgroup isolation is probably the one you are
looking for to control how Flink utilize the CPU resources.
One more idea is to enable DominantResourceCalculator[1] which I think
you've probably done so already.

Found an interesting read[2] about this if you would like to dig deeper.

Thanks,
Rong

[1]
https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-common/apidocs/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.html
[2] https://developer.ibm.com/hadoop/2017/06/30/deep-dive-yarn-cgroups/

--
Rong

On Fri, Nov 8, 2019 at 3:51 AM Yang Wang  wrote:

> Hi Lu Niu,
>
> Yes, you could use `yarn.containers.vcores` to set the vcores of
> taskmanager. However, it could not
> guarantee that the application do not affect each other. By default, the
> yarn cluster are using cgroup
> share. That means a taskmanager could use more cpu than it allocated. When
> the machine is heavy,
> linux kernel will use cpu share as weight to control different processes.
>
> If you want to limit the taskmanager could only use as it allocated, the
> `yarn.nodemanager.linux-container-executor.cgroups.strict-resource-usage=true`
> is the only way. Yarn
> nodemanager will set cpu quota for each taskmanager.
>
>
>
>
> Best,
> Yang
>
> Lu Niu  于2019年11月7日周四 上午1:15写道:
>
>> Hi,
>>
>> Thanks for replying! Basically I want to limit cpu usage so that
>> different application don't affect each other. What's current best
>> practice? Looks
>> `yarn.nodemanager.linux-container-executor.cgroups.strict-resource-usage=true`
>> is one way. How to set how many cpu resources to use? is it
>> "yarn.containers.vcores" ?
>>
>> it should be -ys not -yn in original post, sorry for the typo.
>>
>> Best
>> Lu
>>
>> On Wed, Nov 6, 2019 at 1:41 AM Yang Wang  wrote:
>>
>>> If you want to limit the TaskManager container cpu usage, it is based on
>>> your yarn cluster configuration.
>>> By default, yarn only uses cpu share. You need to set
>>> `yarn.nodemanager.linux-container-executor.cgroups.strict-resource-usage=true`
>>> in yarn-site.xml of all yarn node managers.
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Victor Wong  于2019年11月6日周三 下午5:02写道:
>>>
 Hi Lu,



 You can check out which operator thread causes the high CPU usage, and
 set a unique slot sharing group name [1] to it to prevent too many operator
 threads running in the same TM.

 Hope this will be helpful



 [1].
 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-groups



 Best,

 Victor



 *From: *Vino Yang 
 *Date: *Wednesday, 6 November 2019 at 4:26 PM
 *To: *Lu Niu 
 *Cc: *user 
 *Subject: *Re: Limit max cpu usage per TaskManager



 Hi Lu,



 When using Flink on YARN, it will rely on YARN's resource management
 capabilities, and Flink cannot currently limit CPU usage.

 Also, what version of Flink do you use? As far as I know, since Flink
 1.8, the -yn parameter will not work.



 Best,

 Vino



 Lu Niu  于2019年11月6日周三 下午1:29写道:

 Hi,



 When run flink application in yarn mode, is there a way to limit
 maximum cpu usage per TaskManager?



 I tried this application with just source and sink operator.
 parallelism of source is 60 and parallelism of sink is 1. When running in
 default config, there are 60 TaskManager assigned. I notice one TaskManager
 process cpu usage could be 200% white the rest below 50%.



 When I set -yn = 2 (default is 1), I notice # of TaskManger dropped
 down to 30. and one TaskManger process cpu usage could be 600% while the
 rest below 50%.



 Tried to set yarn.containers.vcores = 2,  all tasks are in start state
 forever, application is not able to turn to running state.



 Best

 Lu




Re: flink on yarn-cluster kerberos authentication for hbase

2019-11-08 Thread Rong Rong
Hi

Can you share more information regarding how you currently setup your
Kerberos that only works with Zookeeper?
Does your HBase share the same KDC?

--
Rong

On Fri, Nov 8, 2019 at 12:33 AM venn  wrote:

> Thanks, I already seen,  not work for me
>
>
>
> *发件人**:* Jaqie Chan 
> *发送时间:* Friday, November 8, 2019 4:14 PM
> *收件人:* venn 
> *抄送:* user@flink.apache.org
> *主题:* Re: flink on yarn-cluster kerberos authentication for hbase
>
>
>
> Hello,
>
>
>
> Does this have helps to you?
>
>
> https://stackoverflow.com/questions/34596165/how-to-do-kerberos-authentication-on-a-flink-standalone-installation
>
>
>
> Regards
>
> 嘉琪
>
>
>
> On Fri, Nov 8, 2019 at 4:00 PM venn  wrote:
>
> HI Guys:
>
> Who can share some example for  flink on yarn-cluster kerberos
> authentication for hbase
>
> I auth as what I do in java program, it look like just
> authentication zookeeper, cannot access for hbase
>
> thanks
>
>


Re: [DISCUSS] Semantic and implementation of per-job mode

2019-10-31 Thread Rong Rong
Hi All,

Thanks @Tison for starting the discussion and I think we have very similar
scenario with Theo's use cases.
In our case we also generates the job graph using a client service (which
serves multiple job graph generation from multiple user code) and we've
found that managing the upload/download between the cluster and the DFS to
be trick and error-prone. In addition, the management of different
environment and requirement from different user in a single service posts
even more trouble for us.

However, shifting the job graph generation towards the cluster side also
requires some thoughts regarding how to manage the driver-job as well as
some dependencies conflicts - In the case for shipping the job graph
generation to the cluster, some unnecessary dependencies for the runtime
will be pulled in by the driver-job (correct me if I were wrong Theo)

I think in general I agree with @Gyula's main point: unless there is a very
strong reason, it is better if we put the driver-mode as an opt-in (at
least at the beginning).
I left some comments on the document as well. Please kindly take a look.

Thanks,
Rong

On Thu, Oct 31, 2019 at 9:26 AM Chan, Regina  wrote:

> Yeah just chiming in this conversation as well. We heavily use multiple
> job graphs to get isolation around retry logic and resource allocation
> across the job graphs. Putting all these parallel flows into a single graph
> would mean sharing of TaskManagers across what was meant to be truly
> independent.
>
>
>
> We also build our job graphs dynamically based off of the state of the
> world at the start of the job. While we’ve had a share of the pain
> described, my understanding is that there would be a tradeoff in number of
> jobs being submitted to the cluster and corresponding resource allocation
> requests. In the model with multiple jobs in a program, there’s at least
> the opportunity to reuse idle taskmanagers.
>
>
>
>
>
>
>
>
>
> *From:* Theo Diefenthal 
> *Sent:* Thursday, October 31, 2019 10:56 AM
> *To:* user@flink.apache.org
> *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode
>
>
>
> I agree with Gyula Fora,
>
>
>
> In our case, we have a client-machine in the middle between our YARN
> cluster and some backend services, which can not be reached directly from
> the cluster nodes. On application startup, we connect to some external
> systems, get some information crucial for the job runtime and finally build
> up the job graph to be committed.
>
>
>
> It is true that we could workaround this, but it would be pretty annoying
> to connect to the remote services, collect the data, upload it to HDFS,
> start the job and make sure, housekeeping of those files is also done at
> some later time.
>
>
>
> The current behavior also corresponds to the behavior of Sparks driver
> mode, which made the transition from Spark to Flink easier for us.
>
>
>
> But I see the point, especially in terms of Kubernetes and would thus also
> vote for an opt-in solution, being the client compilation the default and
> having an option for the per-program mode as well.
>
>
>
> Best regards
>
>
> --
>
> *Von: *"Flavio Pompermaier" 
> *An: *"Yang Wang" 
> *CC: *"tison" , "Newport, Billy" <
> billy.newp...@gs.com>, "Paul Lam" , "SHI Xiaogang"
> , "dev" , "user" <
> user@flink.apache.org>
> *Gesendet: *Donnerstag, 31. Oktober 2019 10:45:36
> *Betreff: *Re: [DISCUSS] Semantic and implementation of per-job mode
>
>
>
> Hi all,
>
> we're using a lot the multiple jobs in one program and this is why: when
> you fetch data from a huge number of sources and, for each source, you do
> some transformation and then you want to write into a single directory the
> union of all outputs (this assumes you're doing batch). When the number of
> sources is large, if you want to do this in a single job, the graph becomes
> very big and this is a problem for several reasons:
>
>- too many substasks /threadsi per slot
>- increase of back pressure
>- if a single "sub-job" fails all the job fails..this is very annoying
>if this happens after a half a day for example
>- In our use case, the big-graph mode takes much longer than running
>each job separately (but maybe this is true only if you don't have much
>hardware resources)
>- debugging the cause of a fail could become a daunting task if the
>job graph is too large
>- we faced may strange errors when trying to run the single big-job
>mode (due to serialization corruption)
>
> So, summarizing our overall experience with Flink batch is: the easier is
> the job graph the better!
>
>
>
> Best,
>
> Flavio
>
>
>
>
>
> On Thu, Oct 31, 2019 at 10:14 AM Yang Wang  wrote:
>
> Thanks for tison starting this exciting discussion. We also suffer a lot
> from the per job mode.
>
> I think the per-job cluster is a dedicated cluster for only one job and
> will not accept more other
>
> jobs. It has the advantage of one-step submission, do not need to start
> 

Re: JDBC Table Sink doesn't seem to sink to database.

2019-10-17 Thread Rong Rong
Splendid. Thanks for following up and moving the discussion forward :-)

--
Rong

On Thu, Oct 17, 2019 at 11:38 AM John Smith  wrote:

> I recorded two:
> Time interval: https://issues.apache.org/jira/browse/FLINK-14442
> Checkpointing: https://issues.apache.org/jira/browse/FLINK-14443
>
>
> On Thu, 17 Oct 2019 at 14:00, Rong Rong  wrote:
>
>> Yes, I think having a time interval execution (for the AppendableSink)
>> should be a good idea.
>> Can you please open a Jira issue[1] for further discussion.
>>
>> --
>> Rong
>>
>> [1] https://issues.apache.org/jira/projects/FLINK/issues
>>
>> On Thu, Oct 17, 2019 at 9:48 AM John Smith 
>> wrote:
>>
>>> Yes correct, I set it to batch interval = 1 and it works fine. Anyways I
>>> think the JDBC sink could have some improvements like batchInterval + time
>>> interval execution. So if the batch doesn't fill up then execute what ever
>>> is left on that time interval.
>>>
>>> On Thu, 17 Oct 2019 at 12:22, Rong Rong  wrote:
>>>
>>>> Hi John,
>>>>
>>>> You are right. IMO the batch interval setting is used for increasing
>>>> the JDBC execution performance purpose.
>>>> The reason why your INSERT INTO statement with a `non_existing_table`
>>>> the exception doesn't happen is because the JDBCAppendableSink does not
>>>> check table existence beforehand. That being said it should fail at the
>>>> first batch execution.
>>>>
>>>> Also I think the `batchInterval` setting is local to the task , this
>>>> means the default 5000 batchInterval is per-partition.
>>>>
>>>> --
>>>> Rong
>>>>
>>>> On Wed, Oct 16, 2019 at 7:21 AM John Smith 
>>>> wrote:
>>>>
>>>>> Ok I think I found it. it's the batch interval setting. From what I
>>>>> see, if we want "realtime" stream to the database we have to set it to 1
>>>>> other wise the sink will wait until, the batch interval count is reached.
>>>>>
>>>>> The batch interval mechanism doesn't see correct? If the default size
>>>>> is 5000 and you need to insert 5001 you will never get that 1 record?
>>>>>
>>>>> On Tue, 15 Oct 2019 at 15:54, John Smith 
>>>>> wrote:
>>>>>
>>>>>> Hi, using 1.8.0
>>>>>>
>>>>>> I have the following job: https://pastebin.com/ibZUE8Qx
>>>>>>
>>>>>> So the job does the following steps...
>>>>>> 1- Consume from Kafka and return JsonObject
>>>>>> 2- Map JsonObject to MyPojo
>>>>>> 3- Convert The stream to a table
>>>>>> 4- Insert the table to JDBC sink table
>>>>>> 5- Print the table.
>>>>>>
>>>>>> - The job seems to work with no errors and I can see the row print to
>>>>>> the console and I see nothing in my database.
>>>>>> - If I put invalid host for the database and restart the job, I get a
>>>>>> connection SQLException error. So at least we know that works.
>>>>>> - If I make a typo on the INSERT INTO statement like INSERTS INTO
>>>>>> non_existing_table, there are no exceptions thrown, the print happens, 
>>>>>> the
>>>>>> stream continues to work.
>>>>>> - If I drop the table from the database, same thing, no exceptions
>>>>>> thrown, the print happens, the stream continues to work.
>>>>>>
>>>>>> So am I missing something?
>>>>>>
>>>>>


Re: JDBC Table Sink doesn't seem to sink to database.

2019-10-17 Thread Rong Rong
Yes, I think having a time interval execution (for the AppendableSink)
should be a good idea.
Can you please open a Jira issue[1] for further discussion.

--
Rong

[1] https://issues.apache.org/jira/projects/FLINK/issues

On Thu, Oct 17, 2019 at 9:48 AM John Smith  wrote:

> Yes correct, I set it to batch interval = 1 and it works fine. Anyways I
> think the JDBC sink could have some improvements like batchInterval + time
> interval execution. So if the batch doesn't fill up then execute what ever
> is left on that time interval.
>
> On Thu, 17 Oct 2019 at 12:22, Rong Rong  wrote:
>
>> Hi John,
>>
>> You are right. IMO the batch interval setting is used for increasing the
>> JDBC execution performance purpose.
>> The reason why your INSERT INTO statement with a `non_existing_table` the
>> exception doesn't happen is because the JDBCAppendableSink does not check
>> table existence beforehand. That being said it should fail at the first
>> batch execution.
>>
>> Also I think the `batchInterval` setting is local to the task , this
>> means the default 5000 batchInterval is per-partition.
>>
>> --
>> Rong
>>
>> On Wed, Oct 16, 2019 at 7:21 AM John Smith 
>> wrote:
>>
>>> Ok I think I found it. it's the batch interval setting. From what I see,
>>> if we want "realtime" stream to the database we have to set it to 1 other
>>> wise the sink will wait until, the batch interval count is reached.
>>>
>>> The batch interval mechanism doesn't see correct? If the default size is
>>> 5000 and you need to insert 5001 you will never get that 1 record?
>>>
>>> On Tue, 15 Oct 2019 at 15:54, John Smith  wrote:
>>>
>>>> Hi, using 1.8.0
>>>>
>>>> I have the following job: https://pastebin.com/ibZUE8Qx
>>>>
>>>> So the job does the following steps...
>>>> 1- Consume from Kafka and return JsonObject
>>>> 2- Map JsonObject to MyPojo
>>>> 3- Convert The stream to a table
>>>> 4- Insert the table to JDBC sink table
>>>> 5- Print the table.
>>>>
>>>> - The job seems to work with no errors and I can see the row print to
>>>> the console and I see nothing in my database.
>>>> - If I put invalid host for the database and restart the job, I get a
>>>> connection SQLException error. So at least we know that works.
>>>> - If I make a typo on the INSERT INTO statement like INSERTS INTO
>>>> non_existing_table, there are no exceptions thrown, the print happens, the
>>>> stream continues to work.
>>>> - If I drop the table from the database, same thing, no exceptions
>>>> thrown, the print happens, the stream continues to work.
>>>>
>>>> So am I missing something?
>>>>
>>>


Re: JDBC Table Sink doesn't seem to sink to database.

2019-10-17 Thread Rong Rong
Hi John,

You are right. IMO the batch interval setting is used for increasing the
JDBC execution performance purpose.
The reason why your INSERT INTO statement with a `non_existing_table` the
exception doesn't happen is because the JDBCAppendableSink does not check
table existence beforehand. That being said it should fail at the first
batch execution.

Also I think the `batchInterval` setting is local to the task , this means
the default 5000 batchInterval is per-partition.

--
Rong

On Wed, Oct 16, 2019 at 7:21 AM John Smith  wrote:

> Ok I think I found it. it's the batch interval setting. From what I see,
> if we want "realtime" stream to the database we have to set it to 1 other
> wise the sink will wait until, the batch interval count is reached.
>
> The batch interval mechanism doesn't see correct? If the default size is
> 5000 and you need to insert 5001 you will never get that 1 record?
>
> On Tue, 15 Oct 2019 at 15:54, John Smith  wrote:
>
>> Hi, using 1.8.0
>>
>> I have the following job: https://pastebin.com/ibZUE8Qx
>>
>> So the job does the following steps...
>> 1- Consume from Kafka and return JsonObject
>> 2- Map JsonObject to MyPojo
>> 3- Convert The stream to a table
>> 4- Insert the table to JDBC sink table
>> 5- Print the table.
>>
>> - The job seems to work with no errors and I can see the row print to the
>> console and I see nothing in my database.
>> - If I put invalid host for the database and restart the job, I get a
>> connection SQLException error. So at least we know that works.
>> - If I make a typo on the INSERT INTO statement like INSERTS INTO
>> non_existing_table, there are no exceptions thrown, the print happens, the
>> stream continues to work.
>> - If I drop the table from the database, same thing, no exceptions
>> thrown, the print happens, the stream continues to work.
>>
>> So am I missing something?
>>
>


Re: Flink Join Time Window

2019-09-30 Thread Rong Rong
Hi Nishant,

On a brief look. I think this is a problem with your 2nd query:

>
> *Table2*...
> Table latestBadIps = tableEnv.sqlQuery("SELECT MAX(b_proctime) AS
> mb_proctime, bad_ip FROM BadIP ***GROUP BY bad_ip***HAVING
> MIN(b_proctime) > CURRENT_TIMESTAMP - INTERVAL '2' DAY ");
> tableEnv.registerTable("LatestBadIP", latestBadIps);


This SQL statement states that the table is a  ending and thus your final
table generates a nonWindowJoin.

If I understood you correctly, you were trying to emit some sort of bad IP
address within a specific time window until it is last seen 2 days ago?
What I am assuming you were trying to do is something similar to the
OverWindowAggregate[1].
Similar to: "SELECT MAX(b_proctime) OVER ( PARTITION BY bad_ip RANGE
BETWEEN INTERVAL '2' DAY PRECEDING AND CURRENT ROW ) FROM BadIP"

Thanks,
Rong

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#aggregations


On Mon, Sep 30, 2019 at 2:17 AM Nishant Gupta 
wrote:

> Hi Team,
>
> I am trying to Join [kafka stream] and [badip stream grouped with badip]
>
> Can someone please help me out with verifying what is wrong in
> highlighted query. Am I writing the time window join query wrong with this
> use case.? Or it is a bug and i should report this
> what is the work around, if it is a bug.
>
> *Table1*
> Table  kafkaSource = tableEnv.fromDataStream(inKafkaStreamM,
> "sourceip,field1,field2, k_proctime.proctime")
> tableEnv.registerTable("KafkaSource", kafkaSource);
>
> *Table2*
> Table  badipTable = tableEnv.fromDataStream(badipStreamM, "bad_ip,
> b_proctime.proctime");
> tableEnv.registerTable("BadIP", badipTable);
>
> Table latestBadIps = tableEnv.sqlQuery("SELECT MAX(b_proctime) AS
> mb_proctime, bad_ip FROM BadIP GROUP BY bad_ip HAVING MIN(b_proctime) >
> CURRENT_TIMESTAMP - INTERVAL '2' DAY ");
> tableEnv.registerTable("LatestBadIP", latestBadIps);
>
> *Table3 - Join*
> *Success for below query*
> Table resultKafkaMalicious = tableEnv.sqlQuery("SELECT K.* FROM
> KafkaSource AS K, LatestBadIP AS LB WHERE K.sourceip = LB.bad_ip");
>
> *Failure for below query*
> Table resultKafkaMalicious = tableEnv.sqlQuery("SELECT K.* FROM
> KafkaSource AS K, LatestBadIP AS LB WHERE K.sourceip = LB.bad_ip AND
> LB.mb_proctime BETWEEN K.k_proctime - INTERVAL '4' HOUR AND K.k_proctime +
> INTERVAL '10' MINUTE");
>
> *Error:*
> 14:25:25,230 INFO  org.apache.flink.runtime.taskmanager.Task
>   - InnerJoin(where: (AND(=(sourceip, bad_ip), >=(mb_proctime,
> -(PROCTIME(k_proctime), 1440:INTERVAL HOUR)), <=(mb_proctime,
> +(PROCTIME(k_proctime), 60:INTERVAL MINUTE, join: (tlsversion,
> tlscipher, tlscurve, tlsserver_name, tlsresumed, tlslast_alert,
> tlsnext_protocol, tlsestablished, tlsclient_cert_chain_fuids, tlssubject,
> tlsissuer, tlsclient_subject, tlsclient_issuer, tlsja3, ecsversion,
> sourceip, sourceport, sourcegeolower, sourcegeoupper,
> sourcegeocountry_iso_code, sourcegeocountry_name, sourcegeoregion_name,
> sourcegeocity_name, sourcegeolocationlat, sourcegeolocationlon,
> sourcegeozipcode, sourcegeotimezone, sourcegeoisp, sourcegeodomain,
> sourcegeonetspeed, sourcegeoiddcode, sourcegeoareacode,
> sourcegeoweatherstation_code, sourcegeoweatherstation_name, sourcegeomcc,
> sourcegeomnc, sourcegeomobilebrand, sourcegeoelevation, sourcegeousagetype,
> destinationip, destinationport, destinationgeolower, destinationgeoupper,
> destinationgeocountry_iso_code, destinationgeocountry_name,
> destinationgeoregion_name, destinationgeocity_name,
> destinationgeolocationlat, destinationgeolocationlon,
> destinationgeozipcode, destinationgeotimezone, destinationgeoisp,
> destinationgeodomain, destinationgeonetspeed, destinationgeoiddcode,
> destinationgeoareacode, destinationgeoweatherstation_code,
> destinationgeoweatherstation_name, destinationgeomcc, destinationgeomnc,
> destinationgeomobilebrand, destinationgeoelevation,
> destinationgeousagetype, eventid, eventprovider, eventdataset, eventtype,
> eventaction, organizationid, timestamp_received, clientmac, transactionid,
> timestamp, message, dhcpassigned_ip, dhcplease_time, dnsrtt,
> dnsquestionclass, dnsquestionname, dnsquestiontype, dnsquery,
> dnsqtype_name, dnsresponse_code, dnsrcode_name, dnsheader_flags,
> dnsanswersdata, dnsanswersttl, dnsrejected, networkprotocol, k_proctime,
> mb_proctime, bad_ip)) -> select: (tlsversion, tlscipher, tlscurve,
> tlsserver_name, tlsresumed, tlslast_alert, tlsnext_protocol,
> tlsestablished, tlsclient_cert_chain_fuids, tlssubject, tlsissuer,
> tlsclient_subject, tlsclient_issuer, tlsja3, ecsversion, sourceip,
> sourceport, sourcegeolower, sourcegeoupper, sourcegeocountry_iso_code,
> sourcegeocountry_name, sourcegeoregion_name, sourcegeocity_name,
> sourcegeolocationlat, sourcegeolocationlon, sourcegeozipcode,
> sourcegeotimezone, sourcegeoisp, sourcegeodomain, sourcegeonetspeed,
> sourcegeoiddcode, sourcegeoareacode, sourcegeoweatherstation_code,
> sourcegeoweatherstation_name, sourcegeomcc, sourcegeomnc,
> 

Re: Extending Flink's SQL-Parser

2019-09-18 Thread Rong Rong
Hi Dominik,

To add to Rui's answer. there are other examples I can think of on how to
extend Calcite's DDL syntax is already in Calcite's Server module [1] and
one of our open-sourced project [2]. you might want to check them out.

--
Rong

[1]
https://github.com/apache/calcite/blob/master/server/src/main/codegen/includes/parserImpls.ftl
[2]
https://github.com/uber/AthenaX/blob/master/athenax-vm-compiler/src/codegen/includes/parserImpls.ftl

On Mon, Sep 16, 2019 at 8:28 PM Rui Li  wrote:

> Hi Dominik,
>
> I think you can check "parserImpls.ftl" to find out how Flink extends
> Calcite's original syntax to support features like CREATE TABLE and DROP
> TABLE, and follow those examples to implement your own syntax. It may also
> be helpful to check the pom.xml of flink-sql-parser to see how we use
> javacc plugin to generate the parser code.
> At the moment I don't think there's any tutorials about extending the SQL
> parser because it's quite internal to Flink. But perhaps the following
> answer provides some insights about how to extend Calcite parser in
> general:
> https://stackoverflow.com/questions/44382826/how-to-change-calcites-default-sql-grammar
>
> On Tue, Sep 17, 2019 at 12:16 AM
> dominik.werner.groenin...@student.uni-augsburg.de <
> dominik.werner.groenin...@student.uni-augsburg.de> wrote:
>
>> Hey there,
>>
>>
>>
>> I have to extend Flink's SQL-parser such that it accepts and evaluates
>> select-queries with different syntax.
>>
>> Furthermore I use Eclipse Oxygen with Maven plugin and Flink Release 1.8.
>> 0.
>>
>>
>>
>> What I believe to know:
>>
>> For parsing SQL-queries Flink-Table uses Apache Calcite's SQL-parser.
>> Flink-Table-Planner is the only module that references the Calcite-Core
>> which contains the parser ("Parser.jj" ?).
>>
>> Therefore I want to import Flink-Table-Planner and Calcite-Core as local
>> projects in Eclipse and edit the files "config.fmpp" and "parserImpls.ftl".
>> After that I want to create a new "Parser.jj" file with Apache Freemaker (I
>> assume there are some tutorials?).
>>
>>
>>
>> What I don't know:
>>
>> Is it a promising plan or are there better strategies to extend the
>> parser?
>>
>> I already tried to import Flink-Table-Planner but I got many errors which
>> might refer to a Scala-problem with Eclipse. Do I have to switch to say
>> IntelliJ? Furthermore I'm not yet clear about how exactly I can extend the
>> parser. Are there any manuals/tutorials to teach me adding a new
>> SELECT-syntax? I already came across the parser extension test but it
>> didn't give me the answers I was looking for.
>>
>>
>>
>> Thanks for your help!
>>
>>
>>
>> Regards,
>>
>> Dominik Gröninger
>>
>>
>> 
>>
>
>
> --
> Best regards!
> Rui Li
>


Re: [ANNOUNCE] Zili Chen becomes a Flink committer

2019-09-11 Thread Rong Rong
Congratulations Zili!

--
Rong

On Wed, Sep 11, 2019 at 6:26 PM Hequn Cheng  wrote:

> Congratulations!
>
> Best, Hequn
>
> On Thu, Sep 12, 2019 at 9:24 AM Jark Wu  wrote:
>
>> Congratulations Zili!
>>
>> Best,
>> Jark
>>
>> On Wed, 11 Sep 2019 at 23:06,  wrote:
>>
>> > Congratulations, Zili.
>> >
>> >
>> >
>> > Best,
>> >
>> > Xingcan
>> >
>> >
>> >
>> > *From:* SHI Xiaogang 
>> > *Sent:* Wednesday, September 11, 2019 7:43 AM
>> > *To:* Guowei Ma 
>> > *Cc:* Fabian Hueske ; Biao Liu ;
>> > Oytun Tez ; bupt_ljy ; dev <
>> > d...@flink.apache.org>; user ; Till Rohrmann <
>> > trohrm...@apache.org>
>> > *Subject:* Re: [ANNOUNCE] Zili Chen becomes a Flink committer
>> >
>> >
>> >
>> > Congratulations!
>> >
>> >
>> >
>> > Regards,
>> >
>> > Xiaogang
>> >
>> >
>> >
>> > Guowei Ma  于2019年9月11日周三 下午7:07写道:
>> >
>> > Congratulations Zili !
>> >
>> >
>> > Best,
>> >
>> > Guowei
>> >
>> >
>> >
>> >
>> >
>> > Fabian Hueske  于2019年9月11日周三 下午7:02写道:
>> >
>> > Congrats Zili Chen :-)
>> >
>> >
>> >
>> > Cheers, Fabian
>> >
>> >
>> >
>> > Am Mi., 11. Sept. 2019 um 12:48 Uhr schrieb Biao Liu <
>> mmyy1...@gmail.com>:
>> >
>> > Congrats Zili!
>> >
>> >
>> >
>> > Thanks,
>> >
>> > Biao /'bɪ.aʊ/
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > On Wed, 11 Sep 2019 at 18:43, Oytun Tez  wrote:
>> >
>> > Congratulations!
>> >
>> >
>> >
>> > ---
>> >
>> > Oytun Tez
>> >
>> >
>> >
>> > *M O T A W O R D*
>> >
>> > *The World's Fastest Human Translation Platform.*
>> >
>> > oy...@motaword.com — www.motaword.com
>> >
>> >
>> >
>> >
>> >
>> > On Wed, Sep 11, 2019 at 6:36 AM bupt_ljy  wrote:
>> >
>> > Congratulations!
>> >
>> >
>> >
>> > Best,
>> >
>> > Jiayi Liao
>> >
>> >
>> >
>> >  Original Message
>> >
>> > *Sender:* Till Rohrmann
>> >
>> > *Recipient:* dev; user
>> >
>> > *Date:* Wednesday, Sep 11, 2019 17:22
>> >
>> > *Subject:* [ANNOUNCE] Zili Chen becomes a Flink committer
>> >
>> >
>> >
>> > Hi everyone,
>> >
>> >
>> >
>> > I'm very happy to announce that Zili Chen (some of you might also know
>> > him as Tison Kun) accepted the offer of the Flink PMC to become a
>> committer
>> > of the Flink project.
>> >
>> >
>> >
>> > Zili Chen has been an active community member for almost 16 months now.
>> > He helped pushing the Flip-6 effort over the finish line, ported a lot
>> of
>> > legacy code tests, removed a good part of the legacy code, contributed
>> > numerous fixes, is involved in the Flink's client API refactoring,
>> drives
>> > the refactoring of Flink's HighAvailabilityServices and much more. Zili
>> > Chen also helped the community by PR reviews, reporting Flink issues,
>> > answering user mails and being very active on the dev mailing list.
>> >
>> >
>> >
>> > Congratulations Zili Chen!
>> >
>> >
>> >
>> > Best, Till
>> >
>> > (on behalf of the Flink PMC)
>> >
>> >
>>
>


Re: type error with generics ..

2019-08-26 Thread Rong Rong
t.scala:237)
>>> [info]   at
>>> pipelines.flink.javadsl.FlinkProcessorJ$1.buildExecutionGraph(FlinkProcessorJ.java:38)
>>> [info]   at
>>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:282)
>>> [info]   at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:151)
>>> [info]   at
>>> pipelines.flink.testkit.FlinkTestkit.doRun(FlinkTestkit.scala:146)
>>> [info]   at
>>> pipelines.flink.testkit.FlinkTestkit.run(FlinkTestkit.scala:138)
>>> [info]   at
>>> pipelines.flink.javadsl.FlinkStreamletTest.shouldProcessDataWhenItIsRun(FlinkStreamletTest.java:46)
>>> [info]   ...
>>> [info]   Cause:
>>> org.apache.flink.api.common.functions.InvalidTypesException: Type of
>>> TypeVariable 'T' in 'class
>>> pipelines.flink.testkit.FlinkSource$CollectionSourceFunction' could not be
>>> determined. This is most likely a type erasure problem. The type extraction
>>> currently supports types with generic variables only in cases where all
>>> variables in the return type can be deduced from the input type(s).
>>> Otherwise the type has to be specified explicitly using type information.
>>> [info]   at
>>> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:882)
>>> [info]   at
>>> org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:803)
>>> [info]   at
>>> org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:769)
>>> [info]   at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1459)
>>> [info]   at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1414)
>>> [info]   at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1396)
>>> [info]   at
>>> pipelines.flink.javadsl.FlinkStreamletTest.shouldProcessDataWhenItIsRun(FlinkStreamletTest.java:34)
>>> [info]   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> [info]   at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> [info]   at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> [info]   ...
>>>
>>> regards.
>>>
>>> On Sun, Aug 25, 2019 at 11:44 PM Rong Rong  wrote:
>>>
>>>> I am not sure how the function `readStream` is implemented (also which
>>>> version of Flink are you using?).
>>>> Can you share more information on your code blocks and exception logs?
>>>>
>>>> Also to answer your question, DataStream return type is determined by
>>>> its underlying transformation, so you cannot set it directly.
>>>>
>>>> Thanks,
>>>> Rong
>>>>
>>>> On Sat, Aug 24, 2019 at 12:29 PM Debasish Ghosh <
>>>> ghosh.debas...@gmail.com> wrote:
>>>>
>>>>> Thanks .. I tried this ..
>>>>>
>>>>> DataStream ins = readStream(in, Data.class, serdeData).map((Data
>>>>> d) -> d).returns(new TypeHint(){}.getTypeInfo());
>>>>>
>>>>> But still get the same error on this line ..
>>>>>
>>>>> (BTW I am not sure how to invoke returns on a DataStream and hence
>>>>> had to do a fake map - any suggestions here ?)
>>>>>
>>>>> regards.
>>>>>
>>>>> On Sat, Aug 24, 2019 at 10:26 PM Rong Rong 
>>>>> wrote:
>>>>>
>>>>>> Hi Debasish,
>>>>>>
>>>>>> I think the error refers to the output of your source instead of your
>>>>>> result of the map function. E.g.
>>>>>> DataStream ins = readStream(in, Data.class, serdeData)*.returns(new
>>>>>> TypeInformation);*
>>>>>> DataStream simples = ins.map((Data d) -> new
>>>>>> Simple(d.getName())).returns(new TypeHint(){}.getTypeInfo());
>>>>>>
>>>>>> --
>>>>>> Rong
>>>>>>
>>>>>> On Fri, Aug 23, 2019 at 9:55 AM Debasish Ghosh <
>>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>>
>>>>>>> Hello -
>>>>>>>
>>>>>>> I have t

Re: type error with generics ..

2019-08-25 Thread Rong Rong
I am not sure how the function `readStream` is implemented (also which
version of Flink are you using?).
Can you share more information on your code blocks and exception logs?

Also to answer your question, DataStream return type is determined by its
underlying transformation, so you cannot set it directly.

Thanks,
Rong

On Sat, Aug 24, 2019 at 12:29 PM Debasish Ghosh 
wrote:

> Thanks .. I tried this ..
>
> DataStream ins = readStream(in, Data.class, serdeData).map((Data d)
> -> d).returns(new TypeHint(){}.getTypeInfo());
>
> But still get the same error on this line ..
>
> (BTW I am not sure how to invoke returns on a DataStream and hence had to
> do a fake map - any suggestions here ?)
>
> regards.
>
> On Sat, Aug 24, 2019 at 10:26 PM Rong Rong  wrote:
>
>> Hi Debasish,
>>
>> I think the error refers to the output of your source instead of your
>> result of the map function. E.g.
>> DataStream ins = readStream(in, Data.class, serdeData)*.returns(new
>> TypeInformation);*
>> DataStream simples = ins.map((Data d) -> new Simple(d.getName()))
>> .returns(new TypeHint(){}.getTypeInfo());
>>
>> --
>> Rong
>>
>> On Fri, Aug 23, 2019 at 9:55 AM Debasish Ghosh 
>> wrote:
>>
>>> Hello -
>>>
>>> I have the following call to addSource where I pass a Custom
>>> SourceFunction ..
>>>
>>> env.addSource(
>>>   new CollectionSourceFunctionJ(data, TypeInformation.of(new
>>> TypeHint(){}))
>>> )
>>>
>>> where data is List and CollectionSourceFunctionJ is a Scala case
>>> class ..
>>>
>>> case class CollectionSourceFunctionJ[T](data: java.util.List[T], ti:
>>> TypeInformation[T]) extends SourceFunction[T] {
>>>   def cancel(): Unit = {}
>>>   def run(ctx: SourceContext[T]): Unit = {
>>> data.asScala.foreach(d ⇒ ctx.collect(d))
>>>   }
>>> }
>>>
>>> When the following transformation runs ..
>>>
>>> DataStream ins = readStream(in, Data.class, serdeData);
>>> DataStream simples = ins.map((Data d) -> new
>>> Simple(d.getName())).returns(new TypeHint(){}.getTypeInfo());
>>>
>>> I get the following exception in the second line ..
>>>
>>> org.apache.flink.api.common.functions.InvalidTypesException: The return
>>>> type of function 'Custom Source' could not be determined automatically, due
>>>> to type erasure. You can give type information hints by using the
>>>> returns(...) method on the result of the transformation call, or by letting
>>>> your function implement the 'ResultTypeQueryable' interface.
>>>
>>>
>>> Initially the returns call was not there and I was getting the same
>>> exception. Now after adding the returns call, nothing changes.
>>>
>>> Any help will be appreciated ..
>>>
>>> regards.
>>>
>>> --
>>> Debasish Ghosh
>>> http://manning.com/ghosh2
>>> http://manning.com/ghosh
>>>
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com
>>> Code: http://github.com/debasishg
>>>
>>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>


Re: type error with generics ..

2019-08-24 Thread Rong Rong
Hi Debasish,

I think the error refers to the output of your source instead of your
result of the map function. E.g.
DataStream ins = readStream(in, Data.class, serdeData)*.returns(new
TypeInformation);*
DataStream simples = ins.map((Data d) -> new Simple(d.getName()))
.returns(new TypeHint(){}.getTypeInfo());

--
Rong

On Fri, Aug 23, 2019 at 9:55 AM Debasish Ghosh 
wrote:

> Hello -
>
> I have the following call to addSource where I pass a Custom
> SourceFunction ..
>
> env.addSource(
>   new CollectionSourceFunctionJ(data, TypeInformation.of(new
> TypeHint(){}))
> )
>
> where data is List and CollectionSourceFunctionJ is a Scala case
> class ..
>
> case class CollectionSourceFunctionJ[T](data: java.util.List[T], ti:
> TypeInformation[T]) extends SourceFunction[T] {
>   def cancel(): Unit = {}
>   def run(ctx: SourceContext[T]): Unit = {
> data.asScala.foreach(d ⇒ ctx.collect(d))
>   }
> }
>
> When the following transformation runs ..
>
> DataStream ins = readStream(in, Data.class, serdeData);
> DataStream simples = ins.map((Data d) -> new
> Simple(d.getName())).returns(new TypeHint(){}.getTypeInfo());
>
> I get the following exception in the second line ..
>
> org.apache.flink.api.common.functions.InvalidTypesException: The return
>> type of function 'Custom Source' could not be determined automatically, due
>> to type erasure. You can give type information hints by using the
>> returns(...) method on the result of the transformation call, or by letting
>> your function implement the 'ResultTypeQueryable' interface.
>
>
> Initially the returns call was not there and I was getting the same
> exception. Now after adding the returns call, nothing changes.
>
> Any help will be appreciated ..
>
> regards.
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>


Re: Problem with Flink on Yarn

2019-08-23 Thread Rong Rong
This seems like your Kerberos server is starting to issue invalid token to
your job manager.
Can you share how your Kerberos setting is configured? This might also
relate to how your KDC servers are configured.

--
Rong

On Fri, Aug 23, 2019 at 7:00 AM Zhu Zhu  wrote:

> Hi Juan,
>
> Have you tried Flink release built with Hadoop 2.7 or later version?
> If you are using Flink 1.8/1.9, it should be Pre-bundled Hadoop 2.7+ jar
> which can be found in the Flink download page.
>
> I think YARN-3103 is about AMRMClientImp.class and it is in the flink
> shaded hadoop jar.
>
> Thanks,
> Zhu Zhu
>
> Juan Gentile  于2019年8月23日周五 下午7:48写道:
>
>> Hello!
>>
>>
>>
>> We are running Flink on Yarn and we are currently getting the following
>> error:
>>
>>
>>
>> *2019-08-23 06:11:01,534 WARN
>> org.apache.hadoop.security.UserGroupInformation   -
>> PriviledgedActionException as: (auth:KERBEROS)
>> cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>> Invalid AMRMToken from appattempt_1564713228886_5299648_01*
>>
>> *2019-08-23 06:11:01,535 WARN
>> org.apache.hadoop.ipc.Client  - Exception
>> encountered while connecting to the server :
>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>> Invalid AMRMToken from appattempt_1564713228886_5299648_01*
>>
>> *2019-08-23 06:11:01,536 WARN
>> org.apache.hadoop.security.UserGroupInformation   -
>> PriviledgedActionException as:  (auth:KERBEROS)
>> cause:org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>> Invalid AMRMToken from appattempt_1564713228886_5299648_01*
>>
>> *2019-08-23 06:11:01,581 WARN
>> org.apache.hadoop.io.retry.RetryInvocationHandler - Exception
>> while invoking ApplicationMasterProtocolPBClientImpl.allocate over rm0. Not
>> retrying because Invalid or Cancelled Token*
>>
>> *org.apache.hadoop.security.token.SecretManager$InvalidToken: Invalid
>> AMRMToken from appattempt_1564713228886_5299648_01*
>>
>> *at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
>> Method)*
>>
>> *at
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)*
>>
>> *at
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)*
>>
>> *at java.lang.reflect.Constructor.newInstance(Constructor.java:423)*
>>
>> *at
>> org.apache.hadoop.yarn.ipc.RPCUtil.instantiateException(RPCUtil.java:53)*
>>
>> *at
>> org.apache.hadoop.yarn.ipc.RPCUtil.unwrapAndThrowException(RPCUtil.java:104)*
>>
>> *at
>> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:79)*
>>
>> *at sun.reflect.GeneratedMethodAccessor37.invoke(Unknown Source)*
>>
>> *at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
>>
>> *at java.lang.reflect.Method.invoke(Method.java:498)*
>>
>> *at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:288)*
>>
>> *at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:206)*
>>
>> *at
>> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:188)*
>>
>> *at com.sun.proxy.$Proxy26.allocate(Unknown Source)*
>>
>> *at
>> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:277)*
>>
>> *at
>> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$HeartbeatThread.run(AMRMClientAsyncImpl.java:224)*
>>
>> *Caused by:
>> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
>> Invalid AMRMToken from appattempt_1564713228886_5299648_01*
>>
>> *at org.apache.hadoop.ipc.Client.call(Client.java:1472)*
>>
>> *at org.apache.hadoop.ipc.Client.call(Client.java:1409)*
>>
>> *at
>> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:231)*
>>
>> *at com.sun.proxy.$Proxy25.allocate(Unknown Source)*
>>
>> *at
>> org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:77)*
>>
>> *... 9 more*
>>
>>
>>
>> The Flink cluster runs ok for a while but then after a day we get this
>> error again. We haven’t made changes to our code so that’s why it’s hard to
>> understand why all of a sudden we started to see this.
>>
>>
>>
>> We found this issue reported on Yarn
>> https://issues.apache.org/jira/browse/YARN-3103 but our version of Yarn
>> already has that fix.
>>
>>
>>
>> Any help will be appreciated.
>>
>>
>>
>> Thank you,
>>
>> Juan
>>
>


Re: Issue with FilterableTableSource and the logical optimizer rules

2019-08-19 Thread Rong Rong
Hi Itamar,

The problem you described sounds similar to this ticket[1].
Can you try to see if following the solution listed resolves your issue?

--
Rong

[1] https://issues.apache.org/jira/browse/FLINK-12399

On Mon, Aug 19, 2019 at 8:56 AM Itamar Ravid  wrote:

> Hi, I’m facing a strange issue with Flink 1.8.1. I’ve implemented a
> StreamTableSource that implements FilterableTableSource and
> ProjectableTableSource. However, I’m seeing that during the logical plan
> optimization (TableEnvironment.scala:288), the applyPredicates method is
> called but the resulting plan does NOT contain the source with the filter
> pushed.
>
> It appears that the problem is in the VolcanoPlanner.findBestExp method;
> when it reaches “root.buildCheapestPlan”, the resulting RelNode does not
> contain the filtered source.
>
> Additionally, I added a breakpoint in
> FlinkLogicalTableSourceScan#computeSelfCost, and the tableSource never has
> the predicates pushed. I verified that in the
> PushFilterIntoTableSourceScanRule, the resulting source always has the
> predicates pushed.
>
> Amusingly, this issue causes queries like “SELECT a FROM src WHERE a =
> 123” to be rewritten to “SELECT 123 FROM src” :-)
>
> Does anyone have any advice on debugging/working around this without
> disabling predicate pushdown on the source?
>


Re: [ANNOUNCE] Andrey Zagrebin becomes a Flink committer

2019-08-14 Thread Rong Rong
Congratulations Andrey!

On Wed, Aug 14, 2019 at 10:14 PM chaojianok  wrote:

> Congratulations Andrey!
> At 2019-08-14 21:26:37, "Till Rohrmann"  wrote:
> >Hi everyone,
> >
> >I'm very happy to announce that Andrey Zagrebin accepted the offer of the
> >Flink PMC to become a committer of the Flink project.
> >
> >Andrey has been an active community member for more than 15 months. He has
> >helped shaping numerous features such as State TTL, FRocksDB release,
> >Shuffle service abstraction, FLIP-1, result partition management and
> >various fixes/improvements. He's also frequently helping out on the
> >user@f.a.o mailing lists.
> >
> >Congratulations Andrey!
> >
> >Best, Till
> >(on behalf of the Flink PMC)
>


Re: how to get the code produced by Flink Code Generator

2019-08-07 Thread Rong Rong
+1. I think this would be a very nice way to provide more verbose feedback
for debugging.

--
Rong

On Wed, Aug 7, 2019 at 9:28 AM Fabian Hueske  wrote:

> Hi Vincent,
>
> I don't think there is such a flag in Flink.
> However, this sounds like a really good idea.
>
> Would you mind creating a Jira ticket for this?
>
> Thank you,
> Fabian
>
> Am Di., 6. Aug. 2019 um 17:53 Uhr schrieb Vincent Cai <
> caidezhi...@foxmail.com>:
>
>> Hi Users,
>> In Spark, we can invoke  Dataset method "queryExecution.debug.codegen()"
>> to get the code produced by Catalyst.
>> is there any similar api in Flink?
>>
>> reference link :
>> https://medium.com/virtuslab/spark-sql-under-the-hood-part-i-26077f85ebf0
>>
>>
>>
>>
>> Regards
>> Vincent  Cai
>>
>>


Re: [ANNOUNCE] Hequn becomes a Flink committer

2019-08-07 Thread Rong Rong
Congratulations Hequn, well deserved!

--
Rong

On Wed, Aug 7, 2019 at 8:30 AM  wrote:

> Congratulations, Hequn!
>
>
>
> *From:* Xintong Song 
> *Sent:* Wednesday, August 07, 2019 10:41 AM
> *To:* d...@flink.apache.org
> *Cc:* user 
> *Subject:* Re: [ANNOUNCE] Hequn becomes a Flink committer
>
>
>
> Congratulations~!
>
>
> Thank you~
>
> Xintong Song
>
>
>
>
>
> On Wed, Aug 7, 2019 at 4:00 PM vino yang  wrote:
>
> Congratulations!
>
> highfei2...@126.com  于2019年8月7日周三 下午7:09写道:
>
> > Congrats Hequn!
> >
> > Best,
> > Jeff Yang
> >
> >
> >  Original Message 
> > Subject: Re: [ANNOUNCE] Hequn becomes a Flink committer
> > From: Piotr Nowojski
> > To: JingsongLee
> > CC: Biao Liu ,Zhu Zhu ,Zili Chen ,Jeff Zhang ,Paul Lam ,jincheng sun
> ,dev ,user
> >
> >
> > Congratulations :)
> >
> > On 7 Aug 2019, at 12:09, JingsongLee  wrote:
> >
> > Congrats Hequn!
> >
> > Best,
> > Jingsong Lee
> >
> > --
> > From:Biao Liu 
> > Send Time:2019年8月7日(星期三) 12:05
> > To:Zhu Zhu 
> > Cc:Zili Chen ; Jeff Zhang ; Paul
> > Lam ; jincheng sun ;
> dev
> > ; user 
> > Subject:Re: [ANNOUNCE] Hequn becomes a Flink committer
> >
> > Congrats Hequn!
> >
> > Thanks,
> > Biao /'bɪ.aʊ/
> >
> >
> >
> > On Wed, Aug 7, 2019 at 6:00 PM Zhu Zhu  wrote:
> > Congratulations to Hequn!
> >
> > Thanks,
> > Zhu Zhu
> >
> > Zili Chen  于2019年8月7日周三 下午5:16写道:
> > Congrats Hequn!
> >
> > Best,
> > tison.
> >
> >
> > Jeff Zhang  于2019年8月7日周三 下午5:14写道:
> > Congrats Hequn!
> >
> > Paul Lam  于2019年8月7日周三 下午5:08写道:
> > Congrats Hequn! Well deserved!
> >
> > Best,
> > Paul Lam
> >
> > 在 2019年8月7日,16:28,jincheng sun  写道:
> >
> > Hi everyone,
> >
> > I'm very happy to announce that Hequn accepted the offer of the Flink PMC
> > to become a committer of the Flink project.
> >
> > Hequn has been contributing to Flink for many years, mainly working on
> > SQL/Table API features. He's also frequently helping out on the user
> > mailing lists and helping check/vote the release.
> >
> > Congratulations Hequn!
> >
> > Best, Jincheng
> > (on behalf of the Flink PMC)
> >
> >
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
> >
> >
> >
>
>


Re: Timestamp(timezone) conversion bug in non blink Table/SQL runtime

2019-07-23 Thread Rong Rong
Hi Shuyi,

I think there were some discussions in the mailing list [1,2] and JIRA
tickets [3,4] that might be related.
Since the table-blink planner doesn't produce such error, I think this
problem is valid and should be fixed.

Thanks,
Rong

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/event-time-timezone-is-not-correct-tt26457.html
[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/TimeZone-shift-problem-in-Flink-SQL-td25666.html#a25739
[3] https://issues.apache.org/jira/browse/FLINK-8353
[4] https://issues.apache.org/jira/browse/FLINK-8169

On Mon, Jul 22, 2019 at 1:49 PM Shuyi Chen  wrote:

> Hi Lasse,
>
> Thanks for the reply. If your input is in epoch time, you are not getting
> local time, instead, you are getting a wrong time that does not make sense.
> For example,  if the user input value is 0 (which means 00:00:00 UTC on 1
> January 1970), and your local timezone is UTC-8, converting 00:00:00 UTC on
> 1 January 1970 to your local timezone should yield 16:00:00 Dec 31, 1969.
> But actually, you will be getting 08:00:00 UTC on 1 January 1970  from
> Table/SQL runtime, which 00:00:00 on 1 January 1970 in your local timezone
> (UTC-8). Your input time just get shifted by 8 hours in output.
>
> Shuyi
>
> On Mon, Jul 22, 2019 at 12:49 PM Lasse Nedergaard <
> lassenederga...@gmail.com> wrote:
>
>> Hi.
>>
>> I have encountered the same problem when you input epoch time to window
>> table function and then use window.start and window.end the out doesn’t
>> output in epoch but local time and I located the problem to the same
>> internal function as you.
>>
>> Med venlig hilsen / Best regards
>> Lasse Nedergaard
>>
>>
>> Den 22. jul. 2019 kl. 20.46 skrev Shuyi Chen :
>>
>> Hi all,
>>
>> Currently, in the non-blink table/SQL runtime, Flink used
>> SqlFunctions.internalToTimestamp(long v) from Calcite to convert event time
>> (in long) to java.sql.Timestamp. However, as discussed in the recent
>> Calcite mailing list (Jul. 19, 2019), SqlFunctions.internalToTimestamp()
>> assumes the input timestamp value is in the current JVM’s default timezone
>> (which is unusual), NOT milliseconds since epoch. And
>> SqlFunctions.internalToTimestamp() is used to convert timestamp value in
>> the current JVM’s default timezone to milliseconds since epoch, which
>> java.sql.Timestamp constructor takes. Therefore, the results will not only
>> be wrong, but change if the job runs in machines on different timezones as
>> well. (The only exception is that all your production machines uses UTC
>> timezone.)
>>
>> Here is an example, if the user input value is 0 (00:00:00 UTC on 1
>> January 1970), and the table/SQL runtime runs in a machine in PST (UTC-8),
>> the output sql.Timestamp after SqlFunctions.internalToTimestamp() will
>> become 2880 millisec since epoch (08:00:00 UTC on 1 January 1970); And
>> with the same input, if the table/SQL runtime runs again in a different
>> machine in EST (UTC-5), the output sql.Timestamp after
>> SqlFunctions.internalToTimestamp() will become 1800 millisec since
>> epoch (05:00:00 UTC on 1 January 1970).
>>
>> More details are captured in
>> https://issues.apache.org/jira/browse/FLINK-13372. Please let me know
>> your thoughts and correct me if I am wrong. Thanks a lot.
>>
>> Shuyi
>>
>>


Re: [Table API] ClassCastException when converting a table to DataStream

2019-07-23 Thread Rong Rong
Hi Dongwon,

Sorry for the late reply. I did try some experiment and seems like you are
right:
Setting the `.return()` type actually alter the underlying type of the
DataStream from a GenericType into a specific RowTypeInfo. Please see the
JIRA ticket [1] for more info.

Regarding the approach, yes I think you cannot access the timer service
from the table/SQL API at this moment so that might be the best approach.
And as Fabian suggested, I don't think there's too much problem if you are
not changing the type info underlying in your DataStream. I will follow up
with this in the JIRA ticket.

--
Rong

[1] https://issues.apache.org/jira/browse/FLINK-13389

On Tue, Jul 23, 2019 at 6:30 AM Dongwon Kim  wrote:

> Hi Fabian,
>
> Thanks for clarification :-)
> I could convert back and forth without worrying about it as I keep using
> Row type during the conversion (even though fields are added).
>
> Best,
>
> Dongwon
>
>
>
> On Tue, Jul 23, 2019 at 8:15 PM Fabian Hueske  wrote:
>
>> Hi Dongwon,
>>
>> regarding the question about the conversion: If you keep using the Row
>> type and not adding/removing fields, the conversion is pretty much for free
>> right now.
>> It will be a MapFunction (sometimes even not function at all) that should
>> be chained with the other operators. Hence, it should boil down to a
>> function call.
>>
>> Best, Fabian
>>
>> Am Sa., 20. Juli 2019 um 03:58 Uhr schrieb Dongwon Kim <
>> eastcirc...@gmail.com>:
>>
>>> Hi Rong,
>>>
>>> I have to dig deeper into the code to reproduce this error. This seems
>>>> to be a bug to me and will update once I find anything.
>>>
>>> Thanks a lot for spending your time on this.
>>>
>>> However from what you explained, if I understand correctly you can do
>>>> all of your processing within the TableAPI scope without converting it back
>>>> and forth to DataStream.
>>>> E.g. if your "map(a -> a)" placeholder represents some sort of map
>>>> function that's simple enough, you can implement and connect with the table
>>>> API via UserDefinedFunction[1].
>>>> As TableAPI becoming the first class citizen [2,3,4], this would be
>>>> much cleaner implementation from my perspective.
>>>
>>> I also agree with you in that the first class citizen Table API will
>>> make everything not only easier but also a lot cleaner.
>>> We however contain some corner cases that force us to covert Table from
>>> and to DataStream.
>>> One such case is to append to Table a column showing the current
>>> watermark of each record; there's no other way but to do that as
>>> ScalarFunction doesn't allow us to get the runtime context information as
>>> ProcessFunction does.
>>>
>>> I have a question regarding the conversion.
>>> Do I have to worry about runtime performance penalty in case that I
>>> cannot help but convert back and fourth to DataStream?
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>> On Sat, Jul 20, 2019 at 12:41 AM Rong Rong  wrote:
>>>
>>>> Hi Dongwon,
>>>>
>>>> I have to dig deeper into the code to reproduce this error. This seems
>>>> to be a bug to me and will update once I find anything.
>>>>
>>>> However from what you explained, if I understand correctly you can do
>>>> all of your processing within the TableAPI scope without converting it back
>>>> and forth to DataStream.
>>>> E.g. if your "map(a -> a)" placeholder represents some sort of map
>>>> function that's simple enough, you can implement and connect with the table
>>>> API via UserDefinedFunction[1].
>>>> As TableAPI becoming the first class citizen [2,3,4], this would be
>>>> much cleaner implementation from my perspective.
>>>>
>>>> --
>>>> Rong
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions
>>>> [2]
>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html
>>>> [3]
>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html
>>>> [4]
>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html
>>>>
>>>>
>>>> On Thu, Jul 

Re: [Table API] ClassCastException when converting a table to DataStream

2019-07-19 Thread Rong Rong
Hi Dongwon,

I have to dig deeper into the code to reproduce this error. This seems to
be a bug to me and will update once I find anything.

However from what you explained, if I understand correctly you can do all
of your processing within the TableAPI scope without converting it back and
forth to DataStream.
E.g. if your "map(a -> a)" placeholder represents some sort of map function
that's simple enough, you can implement and connect with the table API via
UserDefinedFunction[1].
As TableAPI becoming the first class citizen [2,3,4], this would be much
cleaner implementation from my perspective.

--
Rong

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html


On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim  wrote:

> Hi Rong,
>
> Thank you for reply :-)
>
> which Flink version are you using?
>
> I'm using Flink-1.8.0.
>
> what is the "sourceTable.getSchema().toRowType()" return?
>
> Row(time1: TimeIndicatorTypeInfo(rowtime))
>
> what is the line *".map(a -> a)" *do and can you remove it?
>
> *".map(a->a)"* is just to illustrate a problem.
> My actual code contains a process function (instead of .map() in the
> snippet) which appends a new field containing watermark to a row.
> If there were ways to get watermark inside a scalar UDF, I wouldn't
> convert table to datastream and vice versa.
>
> if I am understanding correctly, you are also using "time1" as the
>> rowtime, is that want your intension is to use it later as well?
>
> yup :-)
>
> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
>> adds a type information hint about the return type of this operator. It is
>> used in cases where Flink cannot determine automatically[1].
>
> The reason why I specify
> *".returns(sourceTable.getSchema().toRowType());"* is to give a type
> information hint as you said.
> That is needed later when I need to make another table like
>"*Table anotherTable = tEnv.fromDataStream(stream);"*,
> Without the type information hint, I've got an error
>"*An input of GenericTypeInfo cannot be converted to Table.
> Please specify the type of the input with a RowTypeInfo."*
> That's why I give a type information hint in that way.
>
> Best,
>
> Dongwon
>
> On Fri, Jul 19, 2019 at 12:39 AM Rong Rong  wrote:
>
>> Hi Dongwon,
>>
>> Can you provide a bit more information:
>> which Flink version are you using?
>> what is the "sourceTable.getSchema().toRowType()" return?
>> what is the line *".map(a -> a)" *do and can you remove it?
>> if I am understanding correctly, you are also using "time1" as the
>> rowtime, is that want your intension is to use it later as well?
>>
>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
>> adds a type information hint about the return type of this operator. It is
>> used in cases where Flink cannot determine automatically[1].
>>
>> Thanks,
>> Rong
>>
>> --
>> [1]
>> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351
>>
>>
>> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim 
>> wrote:
>>
>>> Hello,
>>>
>>> Consider the following snippet:
>>>
>>>> Table sourceTable = getKafkaSource0(tEnv);
>>>> DataStream stream = tEnv.toAppendStream(sourceTable, Row.class)
>>>>
>>>> *  .map(a -> a)  .returns(sourceTable.getSchema().toRowType());*
>>>> stream.print();
>>>>
>>> where sourceTable.printSchema() shows:
>>>
>>>> root
>>>>  |-- time1: TimeIndicatorTypeInfo(rowtime)
>>>
>>>
>>>
>>>  This program returns the following exception:
>>>
>>>> Exception in thread "main"
>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution 
>>>> failed.
>>>> at
>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>>

Re:

2019-07-18 Thread Rong Rong
Hi Tangkailin,

If I understand correctly from the snippet, you are trying to invoke this
in some sort of window correct?
If that's the case, your "apply" method will be invoked every time at the
window fire[1]. This means there will be one new instance of the HashMap
created each time "apply" is invoked.

So to answer your question: no, the HashMap is never a singleton, it will
be created everytime "apply" is called.
If you would like to store data across different invoke (e.g. share data
between different calls to the "apply" function) you are better off using
the ProcessWindowFunction[2].

Thanks,
Rong

--
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#windowfunction-legacy
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction

On Wed, Jul 17, 2019 at 7:37 PM tangkailin  wrote:

> Hello,
>
>I am trying to use HashMap In my window function of flink job. if
> the parallelism change, is this hashmap still a singleton? Shouldn’t  I do
> something similar here?
>
>
>
> 发送自 Windows 10 版邮件 应用
>
>
>


Re: [Table API] ClassCastException when converting a table to DataStream

2019-07-18 Thread Rong Rong
Hi Dongwon,

Can you provide a bit more information:
which Flink version are you using?
what is the "sourceTable.getSchema().toRowType()" return?
what is the line *".map(a -> a)" *do and can you remove it?
if I am understanding correctly, you are also using "time1" as the rowtime,
is that want your intension is to use it later as well?

As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
adds a type information hint about the return type of this operator. It is
used in cases where Flink cannot determine automatically[1].

Thanks,
Rong

--
[1]
https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351


On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim  wrote:

> Hello,
>
> Consider the following snippet:
>
>> Table sourceTable = getKafkaSource0(tEnv);
>> DataStream stream = tEnv.toAppendStream(sourceTable, Row.class)
>>
>> *  .map(a -> a)  .returns(sourceTable.getSchema().toRowType());*
>> stream.print();
>>
> where sourceTable.printSchema() shows:
>
>> root
>>  |-- time1: TimeIndicatorTypeInfo(rowtime)
>
>
>
>  This program returns the following exception:
>
>> Exception in thread "main"
>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>> at
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>> at
>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
>> at
>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
>> at app.metatron.test.Main2.main(Main2.java:231)
>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be
>> cast to java.lang.Long*
>> * at
>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
>> ...
>
>
> The row serializer seems to try to deep-copy an instance of
> java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
> Could anybody help me?
>
> Best,
>
> - Dongwon
>
> p.s. though removing .returns() makes everything okay, I need to do that
> as I want to convert DataStream into another table later.
> p.s. the source table is created as follows:
>
> private static final Table getKafkaSource0(StreamTableEnvironment tEnv) {
>> ConnectorDescriptor connectorDescriptor = new Kafka()
>>   .version("universal")
>>   .topic("mytopic")
>>   .property("bootstrap.servers", "localhost:9092")
>>   .property("group.id", "mygroup")
>>   .startFromEarliest();
>> FormatDescriptor formatDescriptor = new Csv()
>>   .deriveSchema()
>>   .ignoreParseErrors()
>>   .fieldDelimiter(',');
>> Schema schemaDescriptor = new Schema()
>>   .field("time1", SQL_TIMESTAMP())
>>   .rowtime(
>> new Rowtime()
>>   .timestampsFromField("rowTime")
>>   .watermarksPeriodicBounded(100)
>>   );
>> tEnv.connect(connectorDescriptor)
>>   .withFormat(formatDescriptor)
>>   .withSchema(schemaDescriptor)
>>   .inAppendMode()
>>   .registerTableSource("mysrc");
>> return tEnv.scan("mysrc");
>>   }
>
>


Re: Flink SQL API: Extra columns added from order by

2019-07-12 Thread Rong Rong
Hi Morrisa,

Can you share more information regarding what type of function "formatDate"
is and how did you configure the return type of that function?
For the question on the first query If the return type is String, then ASC
on a string value should be on alphabetical ordering.

However on the third query, if the GROUP BY and ORDER BY are both operating
on the same input to your UDF it shouldn't be part of the output columns.
This looks like a bug to me.

--
Rong

On Thu, Jul 11, 2019 at 11:45 AM Morrisa Brenner <
morrisa.bren...@klaviyo.com> wrote:

> Hi Flink folks,
>
> We have a custom date formatting function that we use to format the output
> of columns containing dates. Ideally what we want is to format the output
> in the select statement but be able to order by the underlying datetime (so
> that and output with formatted dates "February 2019" and "April 2019" is
> guaranteed to have the rows sorted in time order rather than alphabetical
> order).
>
> When I go to add the unformatted column to the order by, however, that
> gets appended as an extra column to the select statement during the query
> planning process within Calcite. (In the order by parsing, it's considering
> this a different column from the one in the select statement.) When the
> group by column is different in the same way but there's no order by
> column, the extra column isn't added. I've included a couple of simple
> examples below.
>
> Is this the intended behavior of the query planner? Does anyone know of a
> way around this without needing to change the formatting so that it makes
> the output dates correctly sortable?
>
> Thanks for your help!
>
> Morrisa
>
>
>
> Example query and output with order by using formatted date:
>
> SELECT
>
> formatDate(floor(`testTable`.`timestamp` TO MONTH), 'MONTH'),
>
> sum(`testTable`.`count`)
>
> FROM `testTable`
>
> GROUP BY formatDate(floor(`testTable`.`timestamp` TO MONTH), 'MONTH')
>
> ORDER BY formatDate(floor(`testTable`.`timestamp` TO MONTH), 'MONTH') ASC
>
> Month
>
> SUM VALUE
>
> April 2019
>
> 1052
>
> February 2019
>
> 1
>
>
> Example query and output without order by but group by using unformatted
> date:
>
> SELECT
>
> formatDate(floor(`testTable`.`timestamp` TO MONTH), 'MONTH'),
>
> sum(`testTable`.`count`)
>
> FROM `testTable`
>
> GROUP BY floor(`testTable`.`timestamp` TO MONTH)
>
> Month
>
> SUM VALUE
>
> February 2019
>
> 1
>
> April 2019
>
> 1052
>
> We would like to enforce the ordering, so although this output is what we
> want, I don't think we can use this solution.
>
> Example query and output with order by using unformatted date:
>
> SELECT
>
> formatDate(floor(`testTable`.`timestamp` TO MONTH), 'MONTH'),
>
> sum(`testTable`.`count`)
>
> FROM `testTable`
>
> GROUP BY floor(`testTable`.`timestamp` TO MONTH)
>
> ORDER BY floor(`testTable`.`timestamp` TO MONTH) ASC
>
> Month
>
> SUM VALUE
>
> February 2019
>
> 1
>
> 2/1/2019 12:00 AM
>
> April 2019
>
> 1052
>
> 4/1/2019 12:00 AM
>
>
> --
> Morrisa Brenner
> Software Engineer
> 225 Franklin St, Boston, MA 02110
> klaviyo.com 
> [image: Klaviyo Logo]
>


Re: [ANNOUNCE] Rong Rong becomes a Flink committer

2019-07-11 Thread Rong Rong
Thank you all for the warm welcome!

It's my honor to become an Apache Flink committer.
I will continue to work on this great project and contribute more to the
community.

Cheers,
Rong

On Thu, Jul 11, 2019 at 1:05 PM Hao Sun  wrote:

> Congratulations Rong.
>
> On Thu, Jul 11, 2019, 11:39 Xuefu Z  wrote:
>
>> Congratulations, Rong!
>>
>> On Thu, Jul 11, 2019 at 10:59 AM Bowen Li  wrote:
>>
>>> Congrats, Rong!
>>>
>>>
>>> On Thu, Jul 11, 2019 at 10:48 AM Oytun Tez  wrote:
>>>
>>> > Congratulations Rong!
>>> >
>>> > ---
>>> > Oytun Tez
>>> >
>>> > *M O T A W O R D*
>>> > The World's Fastest Human Translation Platform.
>>> > oy...@motaword.com — www.motaword.com
>>> >
>>> >
>>> > On Thu, Jul 11, 2019 at 1:44 PM Peter Huang <
>>> huangzhenqiu0...@gmail.com>
>>> > wrote:
>>> >
>>> >> Congrats Rong!
>>> >>
>>> >> On Thu, Jul 11, 2019 at 10:40 AM Becket Qin 
>>> wrote:
>>> >>
>>> >>> Congrats, Rong!
>>> >>>
>>> >>> On Fri, Jul 12, 2019 at 1:13 AM Xingcan Cui 
>>> wrote:
>>> >>>
>>> >>>> Congrats Rong!
>>> >>>>
>>> >>>> Best,
>>> >>>> Xingcan
>>> >>>>
>>> >>>> On Jul 11, 2019, at 1:08 PM, Shuyi Chen  wrote:
>>> >>>>
>>> >>>> Congratulations, Rong!
>>> >>>>
>>> >>>> On Thu, Jul 11, 2019 at 8:26 AM Yu Li  wrote:
>>> >>>>
>>> >>>>> Congratulations Rong!
>>> >>>>>
>>> >>>>> Best Regards,
>>> >>>>> Yu
>>> >>>>>
>>> >>>>>
>>> >>>>> On Thu, 11 Jul 2019 at 22:54, zhijiang >> >
>>> >>>>> wrote:
>>> >>>>>
>>> >>>>>> Congratulations Rong!
>>> >>>>>>
>>> >>>>>> Best,
>>> >>>>>> Zhijiang
>>> >>>>>>
>>> >>>>>> --
>>> >>>>>> From:Kurt Young 
>>> >>>>>> Send Time:2019年7月11日(星期四) 22:54
>>> >>>>>> To:Kostas Kloudas 
>>> >>>>>> Cc:Jark Wu ; Fabian Hueske ;
>>> >>>>>> dev ; user 
>>> >>>>>> Subject:Re: [ANNOUNCE] Rong Rong becomes a Flink committer
>>> >>>>>>
>>> >>>>>> Congratulations Rong!
>>> >>>>>>
>>> >>>>>> Best,
>>> >>>>>> Kurt
>>> >>>>>>
>>> >>>>>>
>>> >>>>>> On Thu, Jul 11, 2019 at 10:53 PM Kostas Kloudas <
>>> kklou...@gmail.com>
>>> >>>>>> wrote:
>>> >>>>>> Congratulations Rong!
>>> >>>>>>
>>> >>>>>> On Thu, Jul 11, 2019 at 4:40 PM Jark Wu  wrote:
>>> >>>>>> Congratulations Rong Rong!
>>> >>>>>> Welcome on board!
>>> >>>>>>
>>> >>>>>> On Thu, 11 Jul 2019 at 22:25, Fabian Hueske 
>>> >>>>>> wrote:
>>> >>>>>> Hi everyone,
>>> >>>>>>
>>> >>>>>> I'm very happy to announce that Rong Rong accepted the offer of
>>> the
>>> >>>>>> Flink PMC to become a committer of the Flink project.
>>> >>>>>>
>>> >>>>>> Rong has been contributing to Flink for many years, mainly
>>> working on
>>> >>>>>> SQL and Yarn security features. He's also frequently helping out
>>> on the
>>> >>>>>> user@f.a.o mailing lists.
>>> >>>>>>
>>> >>>>>> Congratulations Rong!
>>> >>>>>>
>>> >>>>>> Best, Fabian
>>> >>>>>> (on behalf of the Flink PMC)
>>> >>>>>>
>>> >>>>>>
>>> >>>>>>
>>> >>>>
>>>
>>
>>
>> --
>> Xuefu Zhang
>>
>> "In Honey We Trust!"
>>
>


Re: How are kafka consumer offsets handled if sink fails?

2019-07-08 Thread Rong Rong
Hi John,

I think what Konstantin is trying to say is: Flink's Kafka consumer does
not start consuming from the Kafka commit offset when starting the
consumer, it would actually start with the offset that's last checkpointed
to external DFS. (e.g. the starting point of the consumer has no relevance
with Kafka committed offset whatsoever - if checkpoint is enabled.)

This is to quote:
"*the Flink Kafka Consumer does only commit offsets back to Kafka on a
best-effort basis after every checkpoint. Internally Flink "commits" the
[checkpoints]->[current Kafka offset] as part of its periodic checkpoints.*"

However if you do not enable checkpointing, I think your consumer will
by-default restart from the default kafka offset (which I think is your
committed group offset).

--
Rong


On Mon, Jul 8, 2019 at 6:39 AM John Smith  wrote:

> So when we say a sink is at least once. It's because internally it's not
> checking any kind of state and it sends what it has regardless, correct?
> Cause I willl build a sink that calls stored procedures.
>
> On Sun., Jul. 7, 2019, 4:03 p.m. Konstantin Knauf, <
> konstan...@ververica.com> wrote:
>
>> Hi John,
>>
>> in case of a failure (e.g. in the SQL Sink) the Flink Job will be
>> restarted from the last checkpoint. This means the offset of all Kafka
>> partitions will be reset to that point in the stream along with state of
>> all operators. To enable checkpointing you need to call
>> StreamExecutionEnvironment#enableCheckpointing(). If you using the
>> JDBCSinkFunction (which is an at-least-once sink), the output will be
>> duplicated in the case of failures.
>>
>> To answer your questions:
>>
>> * For this the FlinkKafkaConsumer handles the offsets manually (no
>> auto-commit).
>> * No, the Flink Kafka Consumer does only commit offsets back to Kafka on
>> a best-effort basis after every checkpoint. Internally Flink "commits" the
>> checkpoints as part of its periodic checkpoints.
>> * Yes, along with all other events between the last checkpoint and the
>> failure.
>> * It will continue from the last checkpoint.
>>
>> Hope this helps.
>>
>> Cheers,
>>
>> Konstantin
>>
>> On Fri, Jul 5, 2019 at 8:37 PM John Smith  wrote:
>>
>>> Hi using Apache Flink 1.8.0
>>>
>>> I'm consuming events from Kafka using nothing fancy...
>>>
>>> Properties props = new Properties();
>>> props.setProperty("bootstrap.servers", kafkaAddress);
>>> props.setProperty("group.id",kafkaGroup);
>>>
>>> FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<>(topic, new 
>>> SimpleStringSchema(),props);
>>>
>>>
>>> Do some JSON transforms and then push to my SQL database using JDBC and
>>> stored procedure. Let's assume the SQL sink fails.
>>>
>>> We know that Kafka can either periodically commit offsets or it can be
>>> done manually based on consumers logic.
>>>
>>> - How is the source Kafka consumer offsets handled?
>>> - Does the Flink Kafka consumer commit the offset to per event/record?
>>> - Will that single event that failed be retried?
>>> - So if we had 5 incoming events and say on the 3rd one it failed, will
>>> it continue on the 3rd or will the job restart and try those 5 events.
>>>
>>>
>>>
>>
>> --
>>
>> Konstantin Knauf | Solutions Architect
>>
>> +49 160 91394525
>>
>>
>> Planned Absences: 10.08.2019 - 31.08.2019, 05.09. - 06.09.2010
>>
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>>
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>
>


Re: Flink Table API and Date fields

2019-07-08 Thread Rong Rong
Hi Flavio,

Yes I think the handling of the DateTime in Flink can be better when
dealing with DATE TIME type of systems.
There are several limitations Jingsong briefly mentioned about
java.util.Date. Some of these limitations are even affecting correctness of
the results (e.g. Gregorian vs Julian calendar). and java.sql.Date is
broadly used currently in Flink.

I think handling it as a completely different type, either as generic type
or another extension of the basic type will definitely helpful here. One
important reason is that Flink can prevent the usage of some sql.Date
functions mistakenly applied on util.Date.

--
Rong

On Mon, Jul 8, 2019 at 6:13 AM Timo Walther  wrote:

> Hi Flavio,
>
> yes I agree. This check is a bit confusing. The initial reason for that
> was that sql.Time, sql.Date, and sql.Timestamp extend from util.Date as
> well. But handling it as a generic type as Jingson mentioned might be the
> better option in order to write custom UDFs to handle them.
>
> Regards,
> Timo
>
> Am 08.07.19 um 12:04 schrieb Flavio Pompermaier:
>
> Of course there are java.sql.* and java.time.* in Java but it's also true
> that most of the times the POJOs you read come from an external (Maven) lib
> and if such POJOs contain date fields you have to create a local version of
> that POJO having the java.util.Date fields replaced by a java.sql.Date
> version of them.
> Moreover you also have to create a conversion function from the original
> POJO to the Flink-specific one source (and this is very annoying expecially
> because if the POJO gets modified you have to check that your conversion
> function is updated accordingly).
>
> Summarising, it is possible to work around this limitation but it's very
> uncomfortable (IMHO)
>
> On Mon, Jul 8, 2019 at 11:52 AM JingsongLee 
> wrote:
>
>> Flink 1.9 blink runner will support it as Generic Type,
>> But I don't recommend it. After all, there are java.sql.Date and
>> java.time.* in Java.
>>
>> Best, JingsongLee
>>
>> --
>> From:Flavio Pompermaier 
>> Send Time:2019年7月8日(星期一) 15:40
>> To:JingsongLee 
>> Cc:user 
>> Subject:Re: Flink Table API and Date fields
>>
>> I think I could do it for this specific use case but isn't this a big
>> limitation of Table API?
>> I think that java.util.Date should be a first class citizen in Flink..
>>
>> Best,
>> Flavio
>>
>> On Mon, Jul 8, 2019 at 4:06 AM JingsongLee 
>> wrote:
>> Hi Flavio:
>> Looks like you use java.util.Date in your pojo, Now Flink table not
>> support BasicTypeInfo.DATE_TYPE_INFO
>> because of the limitations of some judgments in the code.
>> Can you use java.sql.Date?
>>
>> Best, JingsongLee
>>
>> --
>> From:Flavio Pompermaier 
>> Send Time:2019年7月5日(星期五) 22:52
>> To:user 
>> Subject:Flink Table API and Date fields
>>
>> Hi to all,
>> in my use case I have a stream of POJOs with Date fields.
>> When I use Table API I get the following error:
>>
>> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: SQL validation failed. Type
>> is not supported: Date
>> at
>> org.apache.flink.table.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:112)
>> at
>> org.apache.flink.table.planner.StreamPlanner.toRel(StreamPlanner.scala:148)
>> at
>> org.apache.flink.table.planner.StreamPlanner.parse(StreamPlanner.scala:114)
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:268)
>> Caused by: org.apache.flink.table.api.TableException: Type is not
>> supported: Date
>> at
>> org.apache.flink.table.calcite.FlinkTypeFactory$.org$apache$flink$table$calcite$FlinkTypeFactory$$typeInfoToSqlTypeName(FlinkTypeFactory.scala:357)
>>
>>
>> Is there a way to deal with this without converting the Date field to a
>> Long one?
>>
>> Best,
>> Flavio
>>
>>
>
>
>


Re: Apache Flink Sql - How to access EXPR$0, EXPR$1 values from a Row in a table

2019-06-17 Thread Rong Rong
Hi Mans,

I am not sure if you intended to name them like this. but if you were to
access them you need to escape them like `EXPR$0` [1].
Also I think Flink defaults unnamed fields in a row to `f0`, `f1`, ... [2].
so you might be able to access them like that.

--
Rong

[1] https://calcite.apache.org/docs/reference.html#identifiers
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/common.html#register-a-datastream-or-dataset-as-table

On Fri, Jun 14, 2019 at 1:55 PM M Singh  wrote:

> Hi:
>
> I am working with Flink Sql and have a table with the following schema:
>
> root
>  |-- name: String
>  |-- idx: Integer
>  |-- pos: String
>  |-- tx: Row(EXPR$0: Integer, EXPR$1: String)
>
> How can I access the attributes tx.EXPR$0 and tx.EXPR$1 ?
>
> I tried the following (the table is registered as 'tupleTable')
>
> Table tuples = myTableFuntionResultTuple.select("select name, idx,
> pos, tx.EXPR$0, tx.EXPR$1 from tupleTable");
>
> but I get the following exception
>
> Exception in thread "main"
> org.apache.flink.table.api.ExpressionParserException: Could not parse
> expression at column 8: `,' expected but `n' found
> select name, idx, pos, tx.EXPR$0, tx.EXPR$1 from tupleTable
>
> Please let me know how if there is any documentation or samples for
> accessing the tuples values in a table.
>
> Thanks
>
> Mans
>


Re: How can I improve this Flink application for "Distinct Count of elements" in the data stream?

2019-06-13 Thread Rong Rong
 Rong, I implemented my solution using a ProcessingWindow
>>> with timeWindow and a ReduceFunction with timeWindowAll [1]. So for the
>>> first window I use parallelism and the second window I use to merge
>>> everything on the Reducer. I guess it is the best approach to do
>>> DistinctCount. Would you suggest some improvements?
>>>
>>> [1]
>>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordDistinctCountProcessTimeWindowSocket.java
>>>
>>> Thanks!
>>> *--*
>>> *-- Felipe Gutierrez*
>>>
>>> *-- skype: felipe.o.gutierrez*
>>> *--* *https://felipeogutierrez.blogspot.com
>>> <https://felipeogutierrez.blogspot.com>*
>>>
>>>
>>> On Wed, Jun 12, 2019 at 9:27 AM Felipe Gutierrez <
>>> felipe.o.gutier...@gmail.com> wrote:
>>>
>>>> Hi Rong,
>>>>
>>>> thanks for your answer. If I understood well, the option will be to use
>>>> ProcessFunction [1] since it has the method onTimer(). But not the
>>>> ProcessWindowFunction [2], because it does not have the method onTimer(). I
>>>> will need this method to call Collector out.collect(...) from the
>>>> onTImer() method in order to emit a single value of my Distinct Count
>>>> function.
>>>>
>>>> Is that reasonable what I am saying?
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/datastream/DataStream.html
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/index.html?org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.html
>>>>
>>>> Kind Regards,
>>>> Felipe
>>>>
>>>> *--*
>>>> *-- Felipe Gutierrez*
>>>>
>>>> *-- skype: felipe.o.gutierrez*
>>>> *--* *https://felipeogutierrez.blogspot.com
>>>> <https://felipeogutierrez.blogspot.com>*
>>>>
>>>>
>>>> On Wed, Jun 12, 2019 at 3:41 AM Rong Rong  wrote:
>>>>
>>>>> Hi Felipe,
>>>>>
>>>>> there are multiple ways to do DISTINCT COUNT in Table/SQL API. In fact
>>>>> there's already a thread going on recently [1]
>>>>> Based on the description you provided, it seems like it might be a
>>>>> better API level to use.
>>>>>
>>>>> To answer your question,
>>>>> - You should be able to use other TimeCharacteristic. You might want
>>>>> to try WindowProcessFunction and see if this fits your use case.
>>>>> - Not sure I fully understand the question, your keyed by should be
>>>>> done on your distinct key (or a combo key) and if you do keyby correctly
>>>>> then yes all msg with same key is processed by the same TM thread.
>>>>>
>>>>> --
>>>>> Rong
>>>>>
>>>>>
>>>>>
>>>>> [1]
>>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/count-DISTINCT-in-flink-SQL-td28061.html
>>>>>
>>>>> On Tue, Jun 11, 2019 at 1:27 AM Felipe Gutierrez <
>>>>> felipe.o.gutier...@gmail.com> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> I have implemented a Flink data stream application to compute
>>>>>> distinct count of words. Flink does not have a built-in operator which 
>>>>>> does
>>>>>> this computation. I used KeyedProcessFunction and I am saving the state 
>>>>>> on
>>>>>> a ValueState descriptor.
>>>>>> Could someone check if my implementation is the best way of doing it?
>>>>>> Here is my solution:
>>>>>> https://stackoverflow.com/questions/56524962/how-can-i-improve-my-count-distinct-for-data-stream-implementation-in-flink/56539296#56539296
>>>>>>
>>>>>> I have some points that I could not understand better:
>>>>>> - I only could use TimeCharacteristic.IngestionTime.
>>>>>> - I split the words using "Tuple2(0, word)", so I
>>>>>> will have always the same key (0). As I understand, all the events will 
>>>>>> be
>>>>>> processed on the same TaskManager which will not achieve parallelism if I
>>>>>> am in a cluster.
>>>>>>
>>>>>> Kind Regards,
>>>>>> Felipe
>>>>>> *--*
>>>>>> *-- Felipe Gutierrez*
>>>>>>
>>>>>> *-- skype: felipe.o.gutierrez*
>>>>>> *--* *https://felipeogutierrez.blogspot.com
>>>>>> <https://felipeogutierrez.blogspot.com>*
>>>>>>
>>>>>


Re: How can I improve this Flink application for "Distinct Count of elements" in the data stream?

2019-06-11 Thread Rong Rong
Hi Felipe,

there are multiple ways to do DISTINCT COUNT in Table/SQL API. In fact
there's already a thread going on recently [1]
Based on the description you provided, it seems like it might be a better
API level to use.

To answer your question,
- You should be able to use other TimeCharacteristic. You might want to try
WindowProcessFunction and see if this fits your use case.
- Not sure I fully understand the question, your keyed by should be done on
your distinct key (or a combo key) and if you do keyby correctly then yes
all msg with same key is processed by the same TM thread.

--
Rong



[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/count-DISTINCT-in-flink-SQL-td28061.html

On Tue, Jun 11, 2019 at 1:27 AM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi all,
>
> I have implemented a Flink data stream application to compute distinct
> count of words. Flink does not have a built-in operator which does this
> computation. I used KeyedProcessFunction and I am saving the state on a
> ValueState descriptor.
> Could someone check if my implementation is the best way of doing it? Here
> is my solution:
> https://stackoverflow.com/questions/56524962/how-can-i-improve-my-count-distinct-for-data-stream-implementation-in-flink/56539296#56539296
>
> I have some points that I could not understand better:
> - I only could use TimeCharacteristic.IngestionTime.
> - I split the words using "Tuple2(0, word)", so I will
> have always the same key (0). As I understand, all the events will be
> processed on the same TaskManager which will not achieve parallelism if I
> am in a cluster.
>
> Kind Regards,
> Felipe
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>


Re: [SURVEY] Usage of flink-ml and [DISCUSS] Delete flink-ml

2019-05-23 Thread Rong Rong
+1 for the deletion.

Also I think it also might be a good idea to update the roadmap for the
plan of removal/development since we've reached the consensus on FLIP-39.

Thanks,
Rong


On Wed, May 22, 2019 at 8:26 AM Shaoxuan Wang  wrote:

> Hi Chesnay,
> Yes, you are right. There is not any active commit planned for the legacy
> Flink-ml package. It does not matter delete it now or later. I will open a
> PR and remove it.
>
> Shaoxuan
>
> On Wed, May 22, 2019 at 7:05 PM Chesnay Schepler 
> wrote:
>
>> I believe we can remove it regardless since users could just use the 1.8
>> version against future releases.
>>
>> Generally speaking, any library/connector that is no longer actively
>> developed can be removed from the project as existing users can always
>> rely on previous versions, which should continue to work by virtue of
>> working against @Stable APIs.
>>
>> On 22/05/2019 12:08, Shaoxuan Wang wrote:
>> > Hi Flink community,
>> >
>> > We plan to delete/deprecate the legacy flink-libraries/flink-ml package
>> in
>> > Flink1.9, and replace it with the new flink-ml interface proposed in
>> FLIP39
>> > (FLINK-12470).
>> > Before we remove this package, I want to reach out to you and ask if
>> there
>> > is any active project still uses this package. Please respond to this
>> > thread and outline how you use flink-libraries/flink-ml.
>> > Depending on the replies of activity and adoption
>> > of flink-libraries/flink-ml, we will decide to either delete this
>> package
>> > in Flink1.9 or deprecate it for now & remove it in the next release
>> after
>> > 1.9.
>> >
>> > Thanks for your attention and help!
>> >
>> > Regards,
>> > Shaoxuan
>> >
>>
>>


Re: Propagating delta from window upon trigger

2019-05-22 Thread Rong Rong
Hi Miki,

Upon trigger, window will be fired with all of its content in its state
invoking the "emitWindowContent" method. which will further invoke the
window function you define.
Thus if your goal is to only emit the delta, one thing is to do so in your
window function. One challenge you might face is how to make your window
function stateful, you might want to checkout this document[1] for more
details.

If your goal is to limit the number of content stored in the window state
to only the delta. you can also look at incremental aggregation state [2]
of the window, but you will have to design your window function in such a
way that it can take delta into account.

--
Rong

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation


On Sun, May 19, 2019 at 8:00 PM Congxian Qiu  wrote:

> Hi, Nikhil
> Window will emit all state to downstream. Can you clear the state while
> triggering?
>
> Nikhil Goyal 于2019年5月19日 周日01:03写道:
>
>> I have window of 1 hour and trigger of 5 min. I want to know if every 5
>> min Flink is writing the entire window or only the keys which changed.
>>
>> On Sat, May 18, 2019, 9:40 AM miki haiat  wrote:
>>
>>> Can you elaborate more what  is you use case ?
>>>
>>>
>>> On Sat, May 18, 2019 at 12:47 AM Nikhil Goyal 
>>> wrote:
>>>
 Hi guys,

 Is there a way in Flink to only propagate the changes which happened in
 the window's state rather than dumbing the contents of the window again and
 again upon trigger?

 Thanks
 Nikhil

>>> --
> Best,
> Congxian
>


Re: RichAsyncFunction for Scala?

2019-05-17 Thread Rong Rong
Hi Shannon,

I think the RichAsyncFunction[1] extends from the normal AsyncFunction so
regarding on the API perspective you should be able to use it.

The problem I think is with Scala anonymous function where I think it went
through a different code path when wrapping the Scala RichAsyncFunction
[2].
Is your problem specifically with the rich anonymous async function or do
you also have problem with regular function extended from RichAsyncFunction?

--
Rong

[1]
https://github.com/apache/flink/blob/release-1.8/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/async/RichAsyncFunction.scala
[2]
https://github.com/apache/flink/blob/release-1.8/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AsyncDataStream.scala#L289

On Thu, May 16, 2019 at 12:26 AM Fabian Hueske  wrote:

> Hi Shannon,
>
> That's a good observation. To be honest, I know why the Scala
> AsyncFunction does not implement RichFunction.
> Maybe this was not intentional and just overlooked when porting the
> functionality to Scala.
>
> Would you mind creating a Jira ticket for this?
>
> Thank you,
> Fabian
>
> Am Di., 14. Mai 2019 um 23:29 Uhr schrieb Shannon Carey <
> sca...@expedia.com>:
>
>> I have some awkward code in a few Flink jobs which is converting a Scala
>> stream into a Java stream in order to pass it to
>> AsyncDataStream.unorderedWait(), and using a Java RichAsyncFunction, due to
>> old versions of Flink not having the ability to do async stuff with a Scala
>> stream.
>>
>>
>>
>> In newer versions of Flink, I see that
>> org.apache.flink.streaming.api.scala.AsyncDataStream is available. However,
>> it accepts only org.apache.flink.streaming.api.scala.async.AsyncFunction,
>> and there does not appear to be an AbstractRichFunction subclass of that
>> trait as I expected. Is there a way to use the Scala interfaces but provide
>> a rich AsyncFunction to AsyncDataStream.unorderedWait()? If not, I will
>> leave the old code as-is.
>>
>>
>>
>> Thanks,
>>
>> Shannon
>>
>


Re: Approach to Auto Scaling Flink Job

2019-05-16 Thread Rong Rong
Hi Anil,

A typical Yarn Resource Manager setting consist of 2 RM nodes [1] for
active/standby setup.
FYI: We've also shared some practical experiences for the limitation of
this setup, and potential redundant fail-save mechanisms in our latest
talk[2] in this year's FlinkForward.

Thanks,
Rong

[1]
https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/ResourceManagerHA.html
[2]
https://www.ververica.com/resources/flink-forward-san-francisco-2019/-practical-experience-running-flink-in-production

On Thu, May 16, 2019 at 5:08 AM Anil  wrote:

> Thanks for the clarification Rong!
> As per my understanding, the Docker containers monitors the job Flink Job
> which are running in Yarn Cluster. Flink JM's have HA enabled. So there's a
> standby JM in case the JM fails and in case of TM failure, that TM will be
> re-deployed. All good. My concern is what if the Yarn Master node goes
> down.
> Is the Yarn cluster running with Multi-master or in case of failure do you
> migrate your job do a different cluster. If so is this failover to a
> different cluster built into Athenax.
> Regards,
> Anil.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Flink ML Use cases

2019-05-14 Thread Rong Rong
Hi Abhishek,

Based on your description, I think this FLIP proposal[1] seems to fit
perfectly for your use case.
you can also checkout the Github repo by Boris (CCed) for the PMML
implementation[2]. This proposal is still under development [3], you are
more than welcome to test out and share your feedbacks.

Thanks,
Rong

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-23+-+Model+Serving
[2] https://github.com/FlinkML/flink-modelServer /
https://github.com/FlinkML/flink-speculative-modelServer
[3] https://github.com/apache/flink/pull/7446

On Tue, May 14, 2019 at 4:44 PM Sameer Wadkar  wrote:

> If you can save the model as a PMML file you can apply it on a stream
> using one of the java pmml libraries.
>
> Sent from my iPhone
>
> On May 14, 2019, at 4:44 PM, Abhishek Singh  wrote:
>
> I was looking forward to using Flink ML for my project where I think I can
> use SVM.
>
> I have been able to run a bath job using flink ML and trained and tested
> my data.
>
> Now I want to do the following:-
> 1. Applying the above-trained model to a stream of events from Kafka
> (Using Data Streams) :For this, I want to know if Flink ML can be used
> with Data Streams.
>
> 2. Persisting the model: I may want to save the trained model for some
> time future.
>
> Can the above 2 use cases be achieved using Apache Flink?
>
> *Regards,*
> *Abhishek Kumar Singh*
>
> *Search Engineer*
> *Mob :+91 7709735480 *
>
>
> *...*
>
>


Re: Approach to Auto Scaling Flink Job

2019-05-12 Thread Rong Rong
Hi Anil,

The reason why we are using Docker is because internally we support
Dockerized container for microservices.

Ideally speaking this can be any external service running on something
other than the actual YARN cluster you Flink application resides. Basically
watchdog runs outside of the Flink cluster: watchdog is designed to capture
failures that is not self-recoverable by YARN/Flink alone, for example a
schema evolution in source/sink; corrupted data that needs to be skipped;
etc. because of this nature, it does not make sense to run it on the same
YARN cluster.

We have enabled HA in Flink's JM now but not at the time of the
presentation.
I CCed Peter who might be able to answer this question better.

Thanks,
Rong


On Sat, May 11, 2019 at 10:12 PM Anil  wrote:

> Thanks Rong. FlinkForward talk was insightful.
> One more question, it's mentioned in the talk that the jobs are running on
> Yarn and are monitored by containers running on Docker. Can you explain why
> is Docker needed here. When we deploy job to Yarn, one Yarn container is
> already dedicated for Job Manager which monitors the job. What additional
> functionality does Docker provide here.
> Also when the jobs are deployed on Yarn, the Master Node becomes a Single
> point of failure. Are you using a Multi-Master setup or have taken another
> approach to handle failover.
> Regards,
> Anil.
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Approach to Auto Scaling Flink Job

2019-05-08 Thread Rong Rong
Hi Anil,

We have a presentation[1] that briefly discuss the higher level of the
approach (via watchdog) in FlinkForward 2018.

We are also restructuring the approach of our open-source AthenaX:
Right now our internal implementation has diverged from the open-source for
too long, it has been a problem for us to merged back to open-source
upstream. So we are likely to create a new modularized version of AthenaX
in the future.

Thanks for the interested, and please stay tune for our next release.

Best,
Rong

[1]
https://www.ververica.com/flink-forward/resources/building-flink-as-a-service-platform-at-uber

On Wed, May 8, 2019 at 11:32 AM Anil  wrote:

> Thanks for the reply Rong. Can you please let me know the design for the
> auto-scaling part, if possible.
> Or guide me in the direction so that I could create this feature myself.
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Inconsistent documentation of Table Conditional functions

2019-05-08 Thread Rong Rong
Hi Flavio,

I believe the documentation meant "X" as a placeholder, where you can
convert "X" into the numeric values (1, 2, ...) depends on how many "CASE
WHEN" conditions you have.
*"resultZ" *is the default result in the "ELSE" statement, and thus it is a
literal.

Thanks,
Rong

On Wed, May 8, 2019 at 9:08 AM Flavio Pompermaier 
wrote:

> Hi to all,
> in the documentation of the Table Conditional functions [1] the example is
> inconsistent with the related description (there's no resultX for example).
> Or am I wrong?
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/table/functions.html#conditional-functions
>
>


Re: Approach to Auto Scaling Flink Job

2019-05-08 Thread Rong Rong
Hi Anil,

Thanks for reporting the issue. I went through the code and I believe the
auto-scaling functionality is still in our internal branch and has not been
merged to the open-source branch yet.
I will change the documentation accordingly.

Thanks,
Rong

On Mon, May 6, 2019 at 9:54 PM Anil  wrote:

> I'm using Uber Open Source project Athenax.  As mentioned in it's docs[1]
> it
> supports `Auto scaling for AthenaX jobs`. I went through the source code on
> Github but didn't find the auto scaling  part. Can someone aware of this
> project please point me in the right direction here.
>
> I'm using Flink's Table API (Flink 1.4.2) and submit my jobs
> programatically
> to the Yarn Cluster. All the JM and TM metric are saved in Prometheus. I am
> thinking of using these metric to develop an algo to re-scale jobs. I would
> also appreciate if someone could share how they developed there
> auto-scaling
> part.
>
> [1]  https://athenax.readthedocs.io/en/latest/
> 
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Filter push-down not working for a custom BatchTableSource

2019-05-04 Thread Rong Rong
Hi Josh,

I think I found the root cause of this issue (please see my comment in
https://issues.apache.org/jira/browse/FLINK-12399).
As of now, you can try override the expalinSource() interface to let
calcite know that the tablesource after calling applyPredicate is different
from the one before calling the function.

Let me know if this works for you :-)

Thanks,
Rong

On Fri, May 3, 2019 at 1:03 PM Josh Bradt  wrote:

> Hi Fabian,
>
> Thanks for taking a look. I've filed this ticket:
> https://issues.apache.org/jira/browse/FLINK-12399
>
> Thanks,
>
> Josh
>
> On Fri, May 3, 2019 at 3:41 AM Fabian Hueske  wrote:
>
>> Hi Josh,
>>
>> The code looks good to me.
>> This seems to be a bug then.
>> It's strange that it works for ORC.
>>
>> Would you mind opening a Jira ticket and maybe a simple reproducable code
>> example?
>>
>> Thank you,
>> Fabian
>>
>> Am Do., 2. Mai 2019 um 18:23 Uhr schrieb Josh Bradt <
>> josh.br...@klaviyo.com>:
>>
>>> Hi Fabian,
>>>
>>> Thanks for your reply. My custom table source does not implement
>>> ProjectableTableSource. I believe that isFilterPushedDown is implemented
>>> correctly since it's nearly identical to what's written in the
>>> OrcTableSource. I pasted a slightly simplified version of the
>>> implementation below. If you wouldn't mind reading over it, is there
>>> anything obviously wrong?
>>>
>>> public final class CustomerTableSource implements 
>>> BatchTableSource,
>>> FilterableTableSource {
>>>
>>> // Iterator that gets data from a REST API as POJO instances
>>> private final AppResourceIterator resourceIterator;
>>> private final String tableName;
>>> private final Class modelClass;
>>> private final AppRequestFilter[] filters;
>>>
>>> public CustomerTableSource(
>>> AppResourceIterator resourceIterator,
>>> String tableName,
>>> Class modelClass) {
>>>
>>> this(resourceIterator, tableName, modelClass, null);
>>> }
>>>
>>> protected CustomerTableSource(
>>> AppResourceIterator resourceIterator,
>>> String tableName,
>>> Class modelClass,
>>> AppRequestFilter[] filters) {
>>>
>>> this.resourceIterator = resourceIterator;
>>> this.tableName = tableName;
>>> this.modelClass = modelClass;
>>> this.filters = filters;
>>> }
>>>
>>> @Override
>>> public TableSource applyPredicate(List 
>>> predicates) {
>>> List acceptedPredicates = new ArrayList<>();
>>> List acceptedFilters = new ArrayList<>();
>>>
>>> for (final Expression predicate : predicates) {
>>> buildFilterForPredicate(predicate).ifPresent(filter -> {
>>> acceptedFilters.add(filter);
>>> acceptedPredicates.add(predicate);
>>> });
>>> }
>>>
>>> predicates.removeAll(acceptedPredicates);
>>>
>>> return new CustomerTableSource(
>>> resourceIterator.withFilters(acceptedFilters),
>>> tableName,
>>> modelClass,
>>> acceptedFilters.toArray(new AppRequestFilter[0])
>>> );
>>> }
>>>
>>> public Optional buildFilterForPredicate(Expression 
>>> predicate) {
>>> // Code for translating an Expression into an AppRequestFilter
>>> // Returns Optional.empty() for predicates we don't want to / can't 
>>> apply
>>> }
>>>
>>> @Override
>>> public boolean isFilterPushedDown() {
>>> return filters != null;
>>> }
>>>
>>> @Override
>>> public DataSet getDataSet(ExecutionEnvironment execEnv) {
>>> return execEnv.fromCollection(resourceIterator, modelClass);
>>> }
>>>
>>> @Override
>>> public TypeInformation getReturnType() {
>>> return TypeInformation.of(modelClass);
>>> }
>>>
>>> @Override
>>> public TableSchema getTableSchema() {
>>> return TableSchema.fromTypeInfo(getReturnType());
>>> }
>>> }
>>>
>>>
>>> Thanks,
>>>
>>> Josh
>>>
>>> On Thu, May 2, 2019 at 3:42 AM Fabian Hueske  wrote:
>>>
 Hi Josh,

 Does your TableSource also implement ProjectableTableSource?
 If yes, you need to make sure that the filter information is also
 forwarded if ProjectableTableSource.projectFields() is called after
 FilterableTableSource.applyPredicate().
 Also make sure to correctly implement
 FilterableTableSource.isFilterPushedDown().

 Hope this helps,
 Fabian

 Am Di., 30. Apr. 2019 um 22:29 Uhr schrieb Josh Bradt <
 josh.br...@klaviyo.com>:

> Hi all,
>
> I'm trying to implement filter push-down on a custom BatchTableSource
> that retrieves data from a REST API and returns it as POJO instances. I've
> implemented FilterableTableSource as described in the docs, returning a 
> new
> instance of my table source containing the predicates that I've removed
> from the list of predicates passed into 

Re: How to implement custom stream operator over a window? And after the Count-Min Sketch?

2019-04-25 Thread Rong Rong
Hi Felipe,

I am not sure the algorithm requires to construct a new extension of the
window operator. I think your implementation of the CountMinSketch object
as an aggregator:
E.g.
1. AggregateState (ACC) should be the aggregating accumulate
count-min-sketch 2-D hash array (plus a few other needed fields).
2. accumulate method just simply do the update.
3. getResult simply get the frequency from sketch.

Thus you will not need to use a customized ValueStateDescriptor.

But I agree that maybe it is a good idea to support a class of use cases
that requires approximate aggregate state (like HyperLogLog?), this
might've been a good value add in my opinion.
I think some further discussion is needed if we are going down that path.
Do you think the original FLINK-2147
<https://issues.apache.org/jira/browse/FLINK-2147> JIRA ticket is a good
place to carry out that discussion? We can probably continue there or
create a new JIRA for discussion.

--
Rong

On Wed, Apr 24, 2019 at 1:32 AM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi Rong,
>
> thanks for your reply. I guess I already did something regarding what you
> have told to me. I have one example on this application [1], which uses
> this state [2]  and computes a CountMinSketch [3].
>
> I am seeking how to implement my own operator over a window in order to
> have more fine-grained control over it and learn with it. And hopefully,
> building a path to contribute to Flink in the future [4].
>
> [1]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MultiSensorMultiStationsReadingMqtt2.java#L69
> [2]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MultiSensorMultiStationsReadingMqtt2.java#L182
> [3]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/util/CountMinSketch.java
> [4] https://issues.apache.org/jira/browse/FLINK-2147
>
> Best,
> Felipe
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> <https://felipeogutierrez.blogspot.com>*
>
>
> On Wed, Apr 24, 2019 at 2:06 AM Rong Rong  wrote:
>
>> Hi Felipe,
>>
>> In a short glance, the question can depend on how your window is (is
>> there any overlap like sliding window) and how many data you would like to
>> process.
>>
>> In general, you can always buffer all the data into a ListState and apply
>> your window function by iterating through all those buffered elements [1].
>> Provided that the data size is small enough to be hold efficiently in the
>> state-backend.
>> If this algorithm has some sort of pre-aggregation that can simplify the
>> buffering through an incremental, orderless aggregation, you can also think
>> about using [2].
>> With these two approaches, you do not necessarily need to implement your
>> own window operator (extending window operator can be tricky), and you also
>> have access to the internal state [3].
>>
>> Hope these helps your exploration.
>>
>> Thanks,
>> Rong
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>>
>> On Tue, Apr 23, 2019 at 8:16 AM Felipe Gutierrez <
>> felipe.o.gutier...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I want to implement my own operator that computes the Count-Min Sketch
>>> over a window in Flink. Then, I found this Jira issue [1]
>>> <https://issues.apache.org/jira/browse/FLINK-2147> which is exactly
>>> what I want. I believe that I have to work out my skills to arrive at a
>>> mature solution.
>>>
>>> So, the first thing that comes to my mind is to create my custom
>>> operator like the AggregateApplyWindowFunction [2]
>>> <https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.html>.
>>> Through this I can create the summary of my data over a window.
>>>
>>> Also, I found this custom JoinOperator example by Till Rohrmann [3]
>>> <https://github.com/tillrohrmann/custom-join> which I think I can base
>>> my implementation since it is done over a DataStream.
>>>
>>> What are your suggestio

Re: Apache Flink - Question about dynamically changing window end time at run time

2019-04-24 Thread Rong Rong
Hi Mans,

Sameer is correct. if you would like to control window triggering based on
other elements that does not belong to this window (in a keyed stream
context) then this is probably the best way to approach.

I think you've also posted in another thread that describes what will be
left after fire-and-purge [1]. As Fabian stated: the only thing that
might've left after is the window (which is the 2 long values indicate the
start/end) and the trigger object. But you are right it might eventually
filled up memory.

Another approach is to implement your own operator that handles all these
internally by your user code. This would require you to replicate many of
the window operator logic though.

Thanks,
Rong

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Apache-Flink-How-to-destroy-global-window-and-release-it-s-resources-td27191.html#a27212

On Wed, Apr 24, 2019 at 5:02 AM Sameer W  wrote:

> Global Windows is fine for this use case. I have used the same strategy.
> You just define custom evictors and triggers and you are all good. Windows
> are managed by keys, so as such as long as events are evicted from the
> window, that counts towards reclaiming memory for the key+window
> combination. Plus there is just window per key with Global Windows.
>
> On Wed, Apr 24, 2019 at 7:47 AM M Singh  wrote:
>
>> Hi Rong:
>>
>> Thanks for your answer.
>>
>> From what I understand the dynamic gap session windows are also created
>> when the event is encountered.  I need to be able to change the window end
>> time at a later time based on what other events are in that window.  One
>> way to do this is to use GlobalWindows but then these are never deleted.
>>
>> Regarding CEP option - I believe that CEP patterns cannot be changed
>> dynamically once they've been complied which limits it usage.
>>
>> Please feel free to correct me.
>>
>> Thanks for your help and pointers.
>>
>> On Tuesday, April 23, 2019, 8:12:56 PM EDT, Rong Rong <
>> walter...@gmail.com> wrote:
>>
>>
>> Hi Mans,
>>
>> I am not sure what you meant by "dynamically change the end-time of a
>> window. If you are referring to dynamically determines the firing time of
>> the window, then it fits into the description of session window [1]:
>> If you want to handle window end time dynamically, one way of which I can
>> think of is the dynamic gap, session window [1] approach. with which you
>> can specify the end-time of a window based on input elements. Provided that
>> you are maintaining a session window.
>> Another way to look at it is through the Flink-CEP library [2].
>>
>> Thanks,
>> Rong
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html#session-windows
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/libs/cep.html#groups-of-patterns
>>
>> On Tue, Apr 23, 2019 at 8:19 AM M Singh  wrote:
>>
>> Hi:
>>
>> I am working on a project and need to change the end time of the window
>> dynamically.  I want to find out if the end time of the window is used
>> internally (for sorting windows/etc) except for handling watermarks that
>> would cause problems if the end time was changed during run time after the
>> window has been created even if no new event has arrived for that window.
>>
>> I don't want to use GlobalWindow since from my understanding it never
>> gets destroyed.
>>
>> If there is any alternate way of dealing with this, please let me know.
>>
>> Thanks
>>
>> Mans
>>
>>


Re: Apache Flink - Question about dynamically changing window end time at run time

2019-04-23 Thread Rong Rong
Hi Mans,

I am not sure what you meant by "dynamically change the end-time of a
window. If you are referring to dynamically determines the firing time of
the window, then it fits into the description of session window [1]:
If you want to handle window end time dynamically, one way of which I can
think of is the dynamic gap, session window [1] approach. with which you
can specify the end-time of a window based on input elements. Provided that
you are maintaining a session window.
Another way to look at it is through the Flink-CEP library [2].

Thanks,
Rong


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/windows.html#session-windows
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/libs/cep.html#groups-of-patterns

On Tue, Apr 23, 2019 at 8:19 AM M Singh  wrote:

> Hi:
>
> I am working on a project and need to change the end time of the window
> dynamically.  I want to find out if the end time of the window is used
> internally (for sorting windows/etc) except for handling watermarks that
> would cause problems if the end time was changed during run time after the
> window has been created even if no new event has arrived for that window.
>
> I don't want to use GlobalWindow since from my understanding it never gets
> destroyed.
>
> If there is any alternate way of dealing with this, please let me know.
>
> Thanks
>
> Mans
>


Re: How to implement custom stream operator over a window? And after the Count-Min Sketch?

2019-04-23 Thread Rong Rong
Hi Felipe,

In a short glance, the question can depend on how your window is (is there
any overlap like sliding window) and how many data you would like to
process.

In general, you can always buffer all the data into a ListState and apply
your window function by iterating through all those buffered elements [1].
Provided that the data size is small enough to be hold efficiently in the
state-backend.
If this algorithm has some sort of pre-aggregation that can simplify the
buffering through an incremental, orderless aggregation, you can also think
about using [2].
With these two approaches, you do not necessarily need to implement your
own window operator (extending window operator can be tricky), and you also
have access to the internal state [3].

Hope these helps your exploration.

Thanks,
Rong

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation
[3]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction

On Tue, Apr 23, 2019 at 8:16 AM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi,
>
> I want to implement my own operator that computes the Count-Min Sketch
> over a window in Flink. Then, I found this Jira issue [1]
>  which is exactly what
> I want. I believe that I have to work out my skills to arrive at a mature
> solution.
>
> So, the first thing that comes to my mind is to create my custom operator
> like the AggregateApplyWindowFunction [2]
> .
> Through this I can create the summary of my data over a window.
>
> Also, I found this custom JoinOperator example by Till Rohrmann [3]
>  which I think I can base my
> implementation since it is done over a DataStream.
>
> What are your suggestions to me in order to start to implement a custom
> stream operator which computes Count-Min Sketch? Do you have any custom
> operator over window/keyBy that I can learn with the source code?
>
> ps.: I have implemented (looking at Blink source code) this a custom
> Combiner [4]
> 
> (map-combiner-reduce) operator.
>
> [1] https://issues.apache.org/jira/browse/FLINK-2147
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/windowing/AggregateApplyWindowFunction.html
> [3] https://github.com/tillrohrmann/custom-join
> [4]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/operator/AbstractRichMapStreamBundleOperator.java
>
> Thanks,
> Felipe
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>


Re: Create Dynamic data type

2019-04-19 Thread Rong Rong
Hi Soheil,

If I understand correctly, when you said "according to the number of rows",
you were trying to dynamically determine the RowType based on how long one
row is, correct?
In this case, I am not sure this is considered supported in JDBCInputFormat
at this moment and it would be hard to support this.

Even if we extend the JDBCInputFormat to dynamically call
Connection.setSchema() every time you consume a row, this would still be
tricky because
1. In your "Select *", you won't be able to know how long the row is until
you actual executes the statement, but you have to setSchema before you
prepare statement.
2. You have to prepare statement every time schema changes.

You might be able to set all fields to just *GenericTypeInfo<>(Object.class)
*and convert it downstream. This will get around the dynamic schema, but
you still need to know the length of your select beforehand.

So, the best I can think of is to change your schema into maps or arrays of
Strings and Ints, or have your own SourceFunction to consume and
deserialize in your own way.

Thanks,
Rong



On Fri, Apr 19, 2019 at 8:19 AM Soheil Pourbafrani 
wrote:

> Hi,
>
> Using JDBCInputFormat I want to read data from database but the problem is
> the table columns are dynamic according to the number of rows. In the
> schema the first column is of type int and in the rest of the column the
> first half is String and the second half is double. So I need a way to
> create the data type dynamically.
>
> I tried the following:
>
> Tuple t = Tuple.getTupleClass(num_col + 1).newInstance();
> t.setField("Integer", 0);
> for(int i = 0; i < num_col; i++) {
> if (i < i / 2) t.setField("String", i);
> if (i > i / 2) t.setField("Double", i);
> }
>
>
> JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
> .setDrivername("com.mysql.jdbc.Driver")
> .setDBUrl("url")
> .setUsername("root")
> .setPassword("pass")
> .setQuery("SELECT * FROM table;")
> .setFetchSize(100)
> .setRowTypeInfo(new RowTypeInfo(TypeExtractor.getForObject(t)))
> .finish();
>
> but I got the following error:
>
> Automatic type extraction is not possible on candidates with null values.
> Please specify the types directly.
>
> Creating the data type using TypeInformation[] fieldTypes I successfully
> can get the data but it needs the static schema and doesn't fit in my case!
>
> Any help will be appreciated!
>


Re: Service discovery on YARN - find out which port was dynamically assigned to the JobManager Web Interface

2019-04-17 Thread Rong Rong
As far as I know, the port will be set to random binding.

Yarn actually have the ability to translate the proxy link to the right
node/port.
If your goal is trying to avoid going through the YARN rest proxy, this
could be a problem: There's chances that the host/port will get changed by
YARN without a backward notification to Consul (or is there a way I am not
sure). This could happen either through YARN's native failure recovery
mechanism (2nd attempt) or through HA.

CCed till who might be able to answer more comprehensively regarding this.

Thanks,
Rong

On Wed, Apr 17, 2019 at 7:14 AM Olivier Solliec 
wrote:

> Hello,
>
>
> I want to be able to register a flink cluster into a service discovery
> system (Consul in our case).
>
> This flink cluster is scheduled on YARN.
>
>
> Is there a way to know which port was assigned to the rest interface ?
>
>
> Via the rest API /jobmanager/config, I see a key "jobmanager.rpc.address"
> which is the correct yarn node, but both "rest.port" and "web.port" have
> "0" value.
>
>
> The idea is to launch a cluster, retrieve its app id, use the yarn web ui
> proxy to get the right node address/port, and register this into Consul.
>
>
> Thank you,
>
>
> Olivier
>


Re: Flink JDBC: Disable auto-commit mode

2019-04-15 Thread Rong Rong
+1, Thanks Konstantinos for opening the ticket.
This would definitely be a useful feature.

--
Rong

On Mon, Apr 15, 2019 at 7:34 AM Fabian Hueske  wrote:

> Great, thank you!
>
> Am Mo., 15. Apr. 2019 um 16:28 Uhr schrieb Papadopoulos, Konstantinos <
> konstantinos.papadopou...@iriworldwide.com>:
>
>> Hi Fabian,
>>
>>
>>
>> I opened the following issue to track the improvement proposed:
>>
>> https://issues.apache.org/jira/browse/FLINK-12198
>>
>>
>>
>> Best,
>>
>> Konstantinos
>>
>>
>>
>> *From:* Papadopoulos, Konstantinos
>> 
>> *Sent:* Δευτέρα, 15 Απριλίου 2019 12:30 μμ
>> *To:* Fabian Hueske 
>> *Cc:* Rong Rong ; user 
>> *Subject:* RE: Flink JDBC: Disable auto-commit mode
>>
>>
>>
>> Hi Fabian,
>>
>>
>>
>> Glad to hear that you agree for such an improvement. Of course, I can
>> handle it.
>>
>>
>>
>> Best,
>>
>> Konstantinos
>>
>>
>>
>> *From:* Fabian Hueske 
>> *Sent:* Δευτέρα, 15 Απριλίου 2019 11:56 πμ
>> *To:* Papadopoulos, Konstantinos <
>> konstantinos.papadopou...@iriworldwide.com>
>> *Cc:* Rong Rong ; user 
>> *Subject:* Re: Flink JDBC: Disable auto-commit mode
>>
>>
>>
>> Hi Konstantinos,
>>
>>
>>
>> This sounds like a useful extension to me.
>>
>> Would you like to create a Jira issue and contribute the improvement?
>>
>>
>>
>> In the meantime, you can just fork the code of JDBCInputFormat and adjust
>> it to your needs.
>>
>>
>>
>> Best, Fabian
>>
>>
>>
>> Am Mo., 15. Apr. 2019 um 08:53 Uhr schrieb Papadopoulos, Konstantinos <
>> konstantinos.papadopou...@iriworldwide.com>:
>>
>> Hi Rong,
>>
>>
>>
>> We have already tried to set the fetch size with no success. According to
>> PG documentation we have to set both configuration parameters (i.e.,
>> auto-commit to false and limit fetch) to achieve our purpose.
>>
>>
>>
>> Thanks,
>>
>> Konstantinos
>>
>>
>>
>> *From:* Rong Rong 
>> *Sent:* Παρασκευή, 12 Απριλίου 2019 6:50 μμ
>> *To:* Papadopoulos, Konstantinos <
>> konstantinos.papadopou...@iriworldwide.com>
>> *Cc:* user 
>> *Subject:* Re: Flink JDBC: Disable auto-commit mode
>>
>>
>>
>> Hi Konstantinos,
>>
>>
>>
>> Seems like setting for auto commit is not directly possible in the
>> current JDBCInputFormatBuilder.
>>
>> However there's a way to specify the fetch size [1] for your DB
>> round-trip, doesn't that resolve your issue?
>>
>>
>>
>> Similarly in JDBCOutputFormat, a batching mode was also used to stash
>> upload rows before flushing to DB.
>>
>>
>>
>> --
>>
>> Rong
>>
>>
>>
>> [1]
>> https://docs.oracle.com/cd/E18283_01/java.112/e16548/resltset.htm#insertedID4
>> <https://nam02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fdocs.oracle.com%2Fcd%2FE18283_01%2Fjava.112%2Fe16548%2Fresltset.htm%23insertedID4=02%7C01%7C%7C2cc537b740b23dcd08d6c185070b%7C43728c2044474b27ac2e4bdabb3c0121%7C0%7C0%7C636909174427110104=vHwPnVT%2BI41Xxkp1Zfl%2BOgTReZ0ILL5RkhDez72jJvM%3D=0>
>>
>>
>>
>> On Fri, Apr 12, 2019 at 6:23 AM Papadopoulos, Konstantinos <
>> konstantinos.papadopou...@iriworldwide.com> wrote:
>>
>> Hi all,
>>
>> We are facing an issue when trying to integrate PostgreSQL with Flink
>> JDBC. When you establish a connection to the PostgreSQL database, it is in
>> auto-commit mode. It means that each SQL statement is treated as a
>> transaction and is automatically committed, but this functionality results
>> in unexpected behavior (e.g., out-of-memory errors) when executed for large
>> result sets. In order to bypass such issues, we must disable the
>> auto-commit mode. To do this, in a simple Java application, we call the
>> setAutoCommit() method of the Connection object.
>>
>> So, my question is: How can we achieve this by using JDBCInputFormat of
>> Flink?
>>
>> Thanks in advance,
>>
>> Konstantinos
>>
>>


Re: Flink JDBC: Disable auto-commit mode

2019-04-12 Thread Rong Rong
Hi Konstantinos,

Seems like setting for auto commit is not directly possible in the current
JDBCInputFormatBuilder.
However there's a way to specify the fetch size [1] for your DB round-trip,
doesn't that resolve your issue?

Similarly in JDBCOutputFormat, a batching mode was also used to stash
upload rows before flushing to DB.

--
Rong

[1]
https://docs.oracle.com/cd/E18283_01/java.112/e16548/resltset.htm#insertedID4

On Fri, Apr 12, 2019 at 6:23 AM Papadopoulos, Konstantinos <
konstantinos.papadopou...@iriworldwide.com> wrote:

> Hi all,
>
> We are facing an issue when trying to integrate PostgreSQL with Flink
> JDBC. When you establish a connection to the PostgreSQL database, it is
> in auto-commit mode. It means that each SQL statement is treated as a
> transaction and is automatically committed, but this functionality results
> in unexpected behavior (e.g., out-of-memory errors) when executed for large
> result sets. In order to bypass such issues, we must disable the
> auto-commit mode. To do this, in a simple Java application, we call the
> setAutoCommit() method of the Connection object.
>
> So, my question is: How can we achieve this by using JDBCInputFormat of
> Flink?
>
> Thanks in advance,
>
> Konstantinos
>
>


Re: [ANNOUNCE] Apache Flink 1.8.0 released

2019-04-10 Thread Rong Rong
Congrats! Thanks Aljoscha for being the release manager and all for making
the release possible.

--
Rong


On Wed, Apr 10, 2019 at 4:23 AM Stefan Richter 
wrote:

> Congrats and thanks to Aljoscha for managing the release!
>
> Best,
> Stefan
>
> > On 10. Apr 2019, at 13:01, Biao Liu  wrote:
> >
> > Great news! Thanks Aljoscha and all the contributors.
> >
> > Till Rohrmann mailto:trohrm...@apache.org>>
> 于2019年4月10日周三 下午6:11写道:
> > Thanks a lot to Aljoscha for being our release manager and to the
> community making this release possible!
> >
> > Cheers,
> > Till
> >
> > On Wed, Apr 10, 2019 at 12:09 PM Hequn Cheng  > wrote:
> > Thanks a lot for the great release Aljoscha!
> > Also thanks for the work by the whole community. :-)
> >
> > Best, Hequn
> >
> > On Wed, Apr 10, 2019 at 6:03 PM Fabian Hueske  > wrote:
> > Congrats to everyone!
> >
> > Thanks Aljoscha and all contributors.
> >
> > Cheers, Fabian
> >
> > Am Mi., 10. Apr. 2019 um 11:54 Uhr schrieb Congxian Qiu <
> qcx978132...@gmail.com >:
> > Cool!
> >
> > Thanks Aljoscha a lot for being our release manager, and all the others
> who make this release possible.
> >
> > Best, Congxian
> > On Apr 10, 2019, 17:47 +0800, Jark Wu  imj...@gmail.com>>, wrote:
> > > Cheers!
> > >
> > > Thanks Aljoscha and all others who make 1.8.0 possible.
> > >
> > > On Wed, 10 Apr 2019 at 17:33, vino yang  > wrote:
> > >
> > > > Great news!
> > > >
> > > > Thanks Aljoscha for being the release manager and thanks to all the
> > > > contributors!
> > > >
> > > > Best,
> > > > Vino
> > > >
> > > > Driesprong, Fokko  于2019年4月10日周三 下午4:54写道:
> > > >
> > > > > Great news! Great effort by the community to make this happen.
> Thanks all!
> > > > >
> > > > > Cheers, Fokko
> > > > >
> > > > > Op wo 10 apr. 2019 om 10:50 schreef Shaoxuan Wang <
> wshaox...@gmail.com >:
> > > > >
> > > > > > Thanks Aljoscha and all others who made contributions to FLINK
> 1.8.0.
> > > > > > Looking forward to FLINK 1.9.0.
> > > > > >
> > > > > > Regards,
> > > > > > Shaoxuan
> > > > > >
> > > > > > On Wed, Apr 10, 2019 at 4:31 PM Aljoscha Krettek <
> aljos...@apache.org >
> > > > > > wrote:
> > > > > >
> > > > > > > The Apache Flink community is very happy to announce the
> release of
> > > > > > Apache
> > > > > > > Flink 1.8.0, which is the next major release.
> > > > > > >
> > > > > > > Apache Flink® is an open-source stream processing framework for
> > > > > > > distributed, high-performing, always-available, and accurate
> data
> > > > > > streaming
> > > > > > > applications.
> > > > > > >
> > > > > > > The release is available for download at:
> > > > > > > https://flink.apache.org/downloads.html <
> https://flink.apache.org/downloads.html>
> > > > > > >
> > > > > > > Please check out the release blog post for an overview of the
> > > > > > improvements
> > > > > > > for this bugfix release:
> > > > > > > https://flink.apache.org/news/2019/04/09/release-1.8.0.html <
> https://flink.apache.org/news/2019/04/09/release-1.8.0.html>
> > > > > > >
> > > > > > > The full release notes are available in Jira:
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12344274
> <
> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12344274
> >
> > > > > > >
> > > > > > > We would like to thank all contributors of the Apache Flink
> community
> > > > > who
> > > > > > > made this release possible!
> > > > > > >
> > > > > > > Regards,
> > > > > > > Aljoscha
> > > > > >
> > > > >
> > > >
>
>


Re: Source reinterpretAsKeyedStream

2019-03-29 Thread Rong Rong
Hi Adrienne,

I think you should be able to reinterpretAsKeyedStream by passing in a
DataStreamSource based on the ITCase example [1].
Can you share the full code/error logs or the IAE?

--
Rong

[1]
https://github.com/apache/flink/blob/release-1.7.2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java#L98

On Fri, Mar 29, 2019 at 6:09 AM Adrienne Kole 
wrote:

> Dear community,
>
> I have a use-case where sources are keyed.
> For example, there is a source function with parallelism 10, and each
> instance has its own key.
> I used reinterpretAsKeyedStream to convert source DataStream to
> KeyedStream, however, I get an IllegalArgument exception.
> Is reinterpretAsKeyedStream can be used with source operators as well, or
> should the operator to be used be already partitioned (by keyby(..)) ?
>
> Thanks,
> Adrienne
>


Re: Infinitely requesting for Yarn container in Flink 1.5

2019-03-29 Thread Rong Rong
Hi Qi,

I think the problem may be related to another similar problem reported in a
previous JIRA [1]. I think a PR is also in discussion.

Thanks,
Rong

[1] https://issues.apache.org/jira/browse/FLINK-10868

On Fri, Mar 29, 2019 at 5:09 AM qi luo  wrote:

> Hello,
>
> Today we encountered an issue where our Flink job request for Yarn
> container infinitely. In the JM log as below, there were errors when
> starting TMs (caused by underlying HDFS errors). So the allocated container
> failed and the job kept requesting for new containers. The failed
> containers were also not returned the the Yarn, so this job quickly
> exhausted our Yarn resources.
>
> Is there any way we can avoid such behavior? Thank you!
>
> 
> JM log:
>
> *INFO  org.apache.flink.yarn.YarnResourceManager -
> Creating container launch context for TaskManagers*
> *INFO  org.apache.flink.yarn.YarnResourceManager -
> Starting TaskManagers*
> *INFO
>  org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy  -
> Opening proxy : xxx.yyy*
> *ERROR org.apache.flink.yarn.YarnResourceManager -
> Could not start TaskManager in container container_e12345.*
> *org.apache.hadoop.yarn.exceptions.YarnException: Unauthorized request to
> start container.*
> **
> *INFO  org.apache.flink.yarn.YarnResourceManager -
> Requesting new TaskExecutor container with resources  vCores:4>. Number pending requests 19.*
> *INFO  org.apache.flink.yarn.YarnResourceManager -
> Received new container: container_e195_1553781735010_27100_01_000136 -
> Remaining pending container requests: 19*
> 
>
> Thanks,
> Qi
>


Re: Calcite SQL Map to Pojo Map

2019-03-29 Thread Rong Rong
I think the proper solution should not be Types.GENERIC(Map.class) as you
will not be able to do any success processing with the return object.
For example, Map['k', 'v'].get('k') will not work.

I think there might be some problem like you suggested that they are
handled as GenericType instead of Pojo type, so it is not utilizing the
correct serializer.
It would be great if you can share the complete code that generates the
exception.

--
Rong

On Thu, Mar 28, 2019 at 1:56 PM shkob1  wrote:

> Apparently the solution is to force map creating using UDF and to have the
> UDF return Types.GENERIC(Map.class)
> That makes them compatible and treated both as GenericType
>
> Thanks!
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Calcite SQL Map to Pojo Map

2019-03-28 Thread Rong Rong
If your conversion is done using a UDF you need to override the
getResultType method [1] to explicitly specify the key and value type
information. As generic erasure will not preseve the  part
of your code.

Thanks,
Rong

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/udfs.html#scalar-functions

On Wed, Mar 27, 2019 at 10:14 AM shkob1  wrote:

> Im trying to convert a SQL query that has a select map[..] into a pojo with
> Map (using tableEnv.toRestractedStream )
> It seems to fail when the field requestedTypeInfo is GenericTypeInfo with
> GenericType while the field type itself is MapTypeInfo with
> Map
>
>
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Result
> field does not match requested type. Requested: GenericType;
> Actual: Map
>
> Any suggestion?
> Shahar
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Map UDF : The Nothing type cannot have a serializer

2019-03-21 Thread Rong Rong
Based on what I saw in the implementation, I think you meant to implement a
ScalarFunction right? since you are only trying to structure a VarArg
string into a Map.

If my understanding was correct. I think the Map constructor[1] is
something you might be able to leverage. It doesn't resolve your
Nullability issue though.
Otherwise you can use the Scalar UDF [2]

--
Rong

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/functions.html#value-construction-functions
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/udfs.html#scalar-functions



On Thu, Mar 21, 2019 at 5:02 PM shkob1  wrote:

> Looking further into the RowType it seems like this field is translated as
> a
> CURSOR rather than a map.. not sure why
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Backoff strategies for async IO functions?

2019-03-19 Thread Rong Rong
Thanks for the feedback @Till.

Yes I agree as well that opening up or changing the AsyncWaitOperator
doesn't seem to be a necessity here.
I think making "AsyncFunctionBase", making the current AsyncFunction as a
extension of it with a some of the default behaviors like Shuyi suggested
seems to be a good starting point.
To some extend we can also provide some of these strategies discussed as
default building blocks but I am not sure this is a must once we have the
"AsyncFunctionBase".

I would try to create a POC for the change and gather some feedbacks and
see if the abstract class contains too much or too little flexibilities.

Best,
Rong

On Tue, Mar 19, 2019 at 10:32 AM Till Rohrmann  wrote:

> Sorry for joining the discussion so late. I agree that we could add some
> more syntactic sugar for handling failure cases. Looking at the existing
> interfaces, I think it should be fairly easy to create an abstract class
> AsyncFunctionWithRetry which extends AsyncFunction and encapsulates the
> retry logic for asynchronous operations. I think it is not strictly
> necessary to change the AsyncWaitOperator to add this functionality.
>
> Cheers,
> Till
>
> On Wed, Mar 13, 2019 at 5:42 PM Rong Rong  wrote:
>
>> Thanks for raising the concern @shuyi and the explanation @konstantin.
>>
>> Upon glancing on the Flink document, it seems like user have full control
>> on the timeout behavior [1]. But unlike AsyncWaitOperator, it is not
>> straightforward to access the internal state of the operator to, for
>> example, put the message back to the async buffer with a retry tag. Thus, I
>> also think that giving a set of common timeout handling seems to be a good
>> idea for Flink users and this could be very useful feature.
>>
>> Regarding the questions and concerns
>> 1. should the "retry counter" to be reset or to continue where it left
>> off?
>> - This is definitely a good point as this counter might need to go into
>> the operator state if we decided to carry over the retry counter.
>> Functionality-wise I think this should be reset because it doesn't
>> represent the same transient state at the time of failure once restart.
>>
>> 2. When should AsyncStream.orderedWait() skip a record?
>> - This should be configurable by user I am assuming, for example we can
>> have additional properties for each strategy described by @shuyi like a
>> combination of:
>>   - (RETRY_STRATEGY, MAX_RETRY_COUNT, RETRY_FAILURE_POLICY)
>>
>> I've also created a JIRA ticket [2] for the discussion, please feel free
>> to share your thoughts and comments.
>>
>> --
>> Rong
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html#timeout-handling
>> [2] https://issues.apache.org/jira/browse/FLINK-11909
>>
>>
>>
>> On Tue, Mar 12, 2019 at 6:29 AM Konstantin Knauf <
>> konstan...@ververica.com> wrote:
>>
>>> Hi Shuyi,
>>>
>>> I am not sure. You could handle retries in the user code within
>>> org.apache.flink.streaming.api.functions.async.AsyncFunction#asyncInvoke
>>> without using a DLQ as described in my original answer to William.  On the
>>> other hand, I agree that it could easier for the user and it is indeed a
>>> common scenario.
>>>
>>> Two follow up questions come to mind:
>>>
>>>- When a Flink job fails and restarts, would you expect the "retry
>>>counter" to be reset or to continue where it left off?
>>>- With AsyncStream.orderedWait() when would you expect a record to
>>>be skipped? After the final timeout, after the first timeout?
>>>
>>> Would you like to create a JIRA ticket [1] for this improvement with
>>> answers to the questions above and we can continue to discuss it there.
>>>
>>> Best,
>>>
>>> Konstantin
>>>
>>> [1]
>>> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-11835?filter=allopenissues
>>>
>>>
>>> On Sun, Mar 10, 2019 at 9:20 AM Shuyi Chen  wrote:
>>>
>>>> Hi Konstantin,
>>>>
>>>> (cc Till since he owns the code)
>>>>
>>>> For async-IO, IO failure and retry is a common & expected pattern. In
>>>> most of the use cases, users will need to deal with IO failure and retry.
>>>> Therefore, I think it's better to address the problem in Flink rather than
>>>> user implementing its custom logic in user code for a better dev
>>>> experience. We do have similar problem in many of ou

Re: Backoff strategies for async IO functions?

2019-03-13 Thread Rong Rong
Thanks for raising the concern @shuyi and the explanation @konstantin.

Upon glancing on the Flink document, it seems like user have full control
on the timeout behavior [1]. But unlike AsyncWaitOperator, it is not
straightforward to access the internal state of the operator to, for
example, put the message back to the async buffer with a retry tag. Thus, I
also think that giving a set of common timeout handling seems to be a good
idea for Flink users and this could be very useful feature.

Regarding the questions and concerns
1. should the "retry counter" to be reset or to continue where it left off?
- This is definitely a good point as this counter might need to go into the
operator state if we decided to carry over the retry counter.
Functionality-wise I think this should be reset because it doesn't
represent the same transient state at the time of failure once restart.

2. When should AsyncStream.orderedWait() skip a record?
- This should be configurable by user I am assuming, for example we can
have additional properties for each strategy described by @shuyi like a
combination of:
  - (RETRY_STRATEGY, MAX_RETRY_COUNT, RETRY_FAILURE_POLICY)

I've also created a JIRA ticket [2] for the discussion, please feel free to
share your thoughts and comments.

--
Rong

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html#timeout-handling
[2] https://issues.apache.org/jira/browse/FLINK-11909



On Tue, Mar 12, 2019 at 6:29 AM Konstantin Knauf 
wrote:

> Hi Shuyi,
>
> I am not sure. You could handle retries in the user code within
> org.apache.flink.streaming.api.functions.async.AsyncFunction#asyncInvoke
> without using a DLQ as described in my original answer to William.  On the
> other hand, I agree that it could easier for the user and it is indeed a
> common scenario.
>
> Two follow up questions come to mind:
>
>- When a Flink job fails and restarts, would you expect the "retry
>counter" to be reset or to continue where it left off?
>- With AsyncStream.orderedWait() when would you expect a record to be
>skipped? After the final timeout, after the first timeout?
>
> Would you like to create a JIRA ticket [1] for this improvement with
> answers to the questions above and we can continue to discuss it there.
>
> Best,
>
> Konstantin
>
> [1]
> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-11835?filter=allopenissues
>
>
> On Sun, Mar 10, 2019 at 9:20 AM Shuyi Chen  wrote:
>
>> Hi Konstantin,
>>
>> (cc Till since he owns the code)
>>
>> For async-IO, IO failure and retry is a common & expected pattern. In
>> most of the use cases, users will need to deal with IO failure and retry.
>> Therefore, I think it's better to address the problem in Flink rather than
>> user implementing its custom logic in user code for a better dev
>> experience. We do have similar problem in many of our use cases. To enable
>> backoff and retry, we need to put the failed message to a DLQ (another
>> Kafka topic) and re-ingest the message from the DLQ topic to retry, which
>> is manual/cumbersome and require setting up extra Kafka topic.
>>
>> Can we add multiple strategies to handle async IO failure in the
>> AsyncWaitOperator? I propose the following strategies:
>>
>>
>>- FAIL_OPERATOR (default & current behavior)
>>- FIX_INTERVAL_RETRY (retry with configurable fixed interval up to N
>>times)
>>- EXP_BACKOFF_RETRY (retry with exponential backoff up to N times)
>>
>> What do you guys think? Thanks a lot.
>>
>> Shuyi
>>
>> On Fri, Mar 8, 2019 at 3:17 PM Konstantin Knauf 
>> wrote:
>>
>>> Hi William,
>>>
>>> the AsyncOperator does not have such a setting. It is "merely" a wrapper
>>> around an asynchronous call, which provides integration with Flink's state
>>> & time management.
>>>
>>> I think, the way to go would be to do the exponential back-off in the
>>> user code and set the timeout of the AsyncOperator to the sum of the
>>> timeouts in the user code (e.g. 2s + 4s + 8s + 16s).
>>>
>>> Cheers,
>>>
>>> Konstantin
>>>
>>>
>>> On Thu, Mar 7, 2019 at 5:20 PM William Saar  wrote:
>>>
 Hi,
 Is there a way to specify an exponential backoff strategy for when
 async function calls fail?

 I have an async function that does web requests to a rate-limited API.
 Can you handle that with settings on the async function call?

 Thanks,
 William



>>>
>>> --
>>>
>>> Konstantin Knauf | Solutions Architect
>>>
>>> +49 160 91394525
>>>
>>> 
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward  - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Data Artisans GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>
>>
>
> --
>
> Konstantin 

Re: Schema Evolution on Dynamic Schema

2019-03-09 Thread Rong Rong
Hi Shahar,

>From my understanding, if you use "groupby" withAggregateFunctions, they
save the accumulators to SQL internal states: which are invariant from your
input schema. Based on what you described I think that's why it is fine for
recovering from existing state.
I think one confusion you might have is the "toRetractStream" syntax. This
actually passes the "retracting" flag to the Flink planner to indicate how
the DataStream operator gets generated based on your SQL.

So in my understanding, there's really no "state" associated with the
"retracting stream", but rather associated with the generated operators.
However, I am not expert in Table/SQL state recovery: I recall there were
an open JIRA[1] that might be related to your question regarding SQL/Table
generated operator recovery. Maybe @Fabian can provide more insight here?

Regarding the rest of the pipeline, both "filter" and "map" operators are
stateless; and sink state recovery depends on what you do.

--
Rong

[1] https://issues.apache.org/jira/browse/FLINK-6966

On Fri, Mar 8, 2019 at 12:07 PM shkob1  wrote:

> Thanks Rong,
>
> I have made some quick test changing the SQL select (adding a select field
> in the middle) and reran the job from a savepoint and it worked without any
> errors. I want to make sure i understand how at what point the state is
> stored and how does it work.
>
> Let's simplify the scenario and forget my specific case of dynamically
> generated pojo. let's focus on generic steps of:
> Source->register table->SQL select and group by session->retracted stream
> (Row)->transformToPojo (Custom Map function) ->pushToSink
>
> And let's assume the SQL select is changed (a field is added somewhere in
> the middle of the select field).
> So:
> We had intermediate results that are in the old format that are loaded from
> state to the new Row object in the retracted stream. is that an accurate
> statement? at what operator/format is the state stored in this case? is it
> the SQL result/Row? is it the Pojo? as this scenario does not fail for me
> im
> trying to understand how/where it is handled in Flink?
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Schema Evolution on Dynamic Schema

2019-03-08 Thread Rong Rong
Hi Shahar,

1. Are you referring to that the incoming data source is published as JSON
and you have a customized Pojo source function / table source that converts
it? In that case it is you that maintains the schema evolution support am I
correct? For Avro I think you can refer to [1].
2. If you change the SQL, you will have to recompile and rerun your job.
This means the new compilation of the SQL will yield correct logic to run
against your new schema. I don't foresee this to be an issue. For the
second problem: yes it is your customized serialization sink function's
responsibility to convert Row into the output class objects. I am not sure
if this is the piece of code that you are looking for [2] if you are using
Avro, but you might be able to leverage that?

If you are sticking with your own format of generated/dynamic class, you
might have to create that in your custom source/sink table.

Thanks,
Rong

[1]
https://github.com/apache/flink/tree/release-1.7/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro
[2]
https://github.com/apache/flink/blob/release-1.7/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroRowSerializationSchema.java#L170

On Thu, Mar 7, 2019 at 11:20 AM Shahar Cizer Kobrinsky <
shahar.kobrin...@gmail.com> wrote:

> Thanks for the response Rong. Would be happy to clarify more.
> So there are two possible changes that could happen:
>
>1. There could be a change in the incoming source schema. Since
>there's a deserialization phase here (JSON -> Pojo) i expect a couple of
>options. Backward compatible changes to the JSON should not have an impact
>(as the Pojo is the same), however we might want to change the Pojo which i
>believe is a state evolving action. I do want to migrate the Pojo to Avro -
>will that suffice for Schema evolution feature to work?
>2. The other possible change is the SQL select fields change, as
>mention someone could add/delete/change-order another field to the SQL
>Select. I do see this as an issue per the way i transform the Row object to
>the dynamically generated class. This is done today through indices of the
>class fields and the ones of the Row object. This seems like an issue for
>when for example a select field is added in the middle and now there's an
>older Row which fields order is not matching the (new) generated Class
>fields order. I'm thinking of how to solve that one - i imagine this is not
>something the schema evolution feature can solve (am i right?). im thinking
>on whether there is a way i can transform the Row object to my generated
>class by maybe the Row's column names corresponding to the generated class
>field names, though i don't see Row object has any notion of column names.
>
> Would love to hear your thoughts. If you want me to paste some code here i
> can do so.
>
> Shahar
>
> On Thu, Mar 7, 2019 at 10:40 AM Rong Rong  wrote:
>
>> Hi Shahar,
>>
>> I wasn't sure which schema are you describing that is going to "evolve"
>> (is it the registered_table? or the output sink?). It will be great if you
>> can clarify more.
>>
>> For the example you provided, IMO it is more considered as logic change
>> instead of schema evolution:
>> - if you are changing max(c) to max(d) in your query. I don't think this
>> qualifies as schema evolution.
>> - if you are adding another column "max(d)" to your query along with your
>> existing "max(c)" that might be considered as a backward compatible change.
>> However, either case you will have to restart your logic, you can also
>> consult how state schema evolution [1], and there are many other problems
>> that can be tricky as well[2,3].
>>
>> Thanks,
>> Rong
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/schema_evolution.html
>> [2]
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Window-operator-schema-evolution-savepoint-deserialization-fail-td23079.html
>> [3]
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Savepointing-with-Avro-Schema-change-td19290.html#a19293
>>
>>
>> On Wed, Mar 6, 2019 at 12:52 PM shkob1 
>> wrote:
>>
>>> Hey,
>>>
>>> My job is built on SQL that is injected as an input to the job. so lets
>>> take
>>> an example of
>>>
>>> Select a,max(b) as MaxB,max(c) as MaxC FROM registered_table GROUP BY a
>>>
>>> (side note: in order for the state not to grow indefinitely i'm
>>> transforming
>>> to a retracted stream and filtering based on a custom trigger)
>>>
>>&g

Re: Schema Evolution on Dynamic Schema

2019-03-07 Thread Rong Rong
Hi Shahar,

I wasn't sure which schema are you describing that is going to "evolve" (is
it the registered_table? or the output sink?). It will be great if you can
clarify more.

For the example you provided, IMO it is more considered as logic change
instead of schema evolution:
- if you are changing max(c) to max(d) in your query. I don't think this
qualifies as schema evolution.
- if you are adding another column "max(d)" to your query along with your
existing "max(c)" that might be considered as a backward compatible change.
However, either case you will have to restart your logic, you can also
consult how state schema evolution [1], and there are many other problems
that can be tricky as well[2,3].

Thanks,
Rong

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/schema_evolution.html
[2]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Window-operator-schema-evolution-savepoint-deserialization-fail-td23079.html
[3]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Savepointing-with-Avro-Schema-change-td19290.html#a19293


On Wed, Mar 6, 2019 at 12:52 PM shkob1  wrote:

> Hey,
>
> My job is built on SQL that is injected as an input to the job. so lets
> take
> an example of
>
> Select a,max(b) as MaxB,max(c) as MaxC FROM registered_table GROUP BY a
>
> (side note: in order for the state not to grow indefinitely i'm
> transforming
> to a retracted stream and filtering based on a custom trigger)
>
> In order to get the output as a Json format i basically created a way to
> dynamically generate a class and registering it to the class loader, so
> when
> transforming to the retracted stream im doing something like:
>
> Table result = tableEnv.sqlQuery(sqlExpression);
> tableEnv.toRetractStream(result, Row.class, config)
> .filter(tuple -> tuple.f0)
> .map(new RowToDynamicClassMapper(sqlSelectFields))
> .addSink(..)
>
> This actually works pretty good (though i do need to make sure to register
> the dynamic class to the class loader whenever the state is loaded)
>
> Im now looking into "schema evolution" - which basically means what happens
> when the query is changed (say max(c) is removed, and maybe max(d) is
> added). I dont know if that fits the classic "schema evolution" feature or
> should that be thought about differently. Would be happy to get some
> thoughts.
>
> Thanks!
>
>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: [Flink-Question] In Flink parallel computing, how do different windows receive the data of their own partition, that is, how does Window determine which partition number the current Window belongs

2019-03-03 Thread Rong Rong
Hi

I am not sure if I understand your question correctly, so will try to
explain the flow how elements gets into window operators.

Flink makes the partition assignment before invoking the operator to
process element. For the word count example, WindowOperator is invoked by
StreamInputProcessor[1] to "setKeyContextElement".
The actual key is then set by WindowOperator (inherently by
AbstractStreamOperator[2]), which ultimately passed to KeyedStateBackend[3].

So, by the time WindowOperator processes elements, the KeyedStateBackend
was already set to the correct key.

Hope this answers your question.

--
Rong


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.html

On Sun, Mar 3, 2019 at 5:15 AM 刘 文  wrote:

> ). Environment Flink1.7.2 WordCount local, stream processing
> ).source RecordWriter.emit(), for each element by key, divided into
> different partitions, the partition location of each element has been
> determined, the number of partitions is determined by
> DataStream.setParallelism(2)
>  ). By copyFromSerializerToTargetChannel(int targetChannel) to write data
> to different channels, it is to send data to the window corresponding to
> different partitions (data is sent one by one)


Re: Flink window triggering and timing on connected streams

2019-02-26 Thread Rong Rong
Hi Andrew,

To add to the answer Till and Hequn already provide. WindowOperator are
operating on a per-key-group based. so as long as you only have one open
session per partition key group, you should be able to manage the windowing
using the watermark strategy Hequn mentioned.
As Till mentioned, the watermarks are the minimum of the connected streams,
thus you should be able to just use "session window with long timeout" as
you described.

One thought is that have you looked at Flink CEP[1]? This use case seems to
fit pretty well if you can do the co-stream function as a first stage.

--
Rong

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/cep.html

On Tue, Feb 26, 2019 at 2:31 AM Till Rohrmann  wrote:

> Hi Andrew,
>
> if using connected streams (e.g. CoFlatMapFunction or CoMapFunction), then
> the watermarks will be synchronized across both inputs. Concretely, you
> will always emit the minimum of the watermarks arriving on input channel 1
> and 2. Take a look at AbstractStreamOperator.java:773-804.
>
> Cheers,
> Till
>
> On Tue, Feb 26, 2019 at 4:27 AM Andrew Roberts  wrote:
>
>> I’m not sure that approach will work for me, as I have many sessions
>> going at the same time which can overlap. Also, I need to be able to have
>> sessions time out if they never receive an end event. Do you know directly
>> if setting a timer triggers when any timestamp passes that time, or when
>> the watermark passes that time?
>>
>>
>> On Feb 25, 2019, at 9:08 PM, Hequn Cheng  wrote:
>>
>> Hi Andrew,
>>
>> >  I have an “end session” event that I want to cause the window to fire
>> and purge.
>> Do you want to fire the window only by the 'end session' event? I see one
>> option to solve the problem. You can use a tumbling window(say 5s) and set
>> your timestamp to t‘+5s each time receiving an 'end session' event in your
>> user-defined `AssignerWithPeriodicWatermarks`.
>>
>> > My understanding is that this is what the trailing watermark is for,
>> and that in connected streams, the lowest (earliest) watermark of the input
>> streams is what is seen as the watermark downstream.
>> Yes, and we can make use of this to make window fires only on 'end
>> session' event using the solution above.
>>
>> Best, Hequn
>>
>>
>> On Tue, Feb 26, 2019 at 5:54 AM Andrew Roberts  wrote:
>>
>>> Hello,
>>>
>>> I’m trying to implement session windows over a set of connected streams
>>> (event time), with some custom triggering behavior. Essentially, I allow
>>> very long session gaps, but I have an “end session” event that I want to
>>> cause the window to fire and purge. I’m assigning timestamps and watermarks
>>> using BoundedOutOfOrdernessTimestampExtractor, with a 1 minute delay for
>>> the watermark. I have things mostly wired up, but I have some confusion
>>> about how I can ensure that my streams stay “in sync” relative to time.
>>>
>>>  Let’s say I am connecting streams A and B. Stream A is where the “end
>>> session” event always comes from. If I have a session involving events from
>>> time t to t’ in stream A, and then at t’ I get an “end session”, I want to
>>> ensure that the window doesn’t fire until stream B has also processed
>>> events (added events to the window) up to time t’. My understanding is that
>>> this is what the trailing watermark is for, and that in connected streams,
>>> the lowest (earliest) watermark of the input streams is what is seen as the
>>> watermark downstream.
>>>
>>> Currently, I’m setting a timer for the current time + 1 when I see my
>>> “end event”, with the idea that that timer will fire when the WATERMARK
>>> passes that time, i.e., all streams have progressed at least as far as that
>>> end event. However, the implementation of EventTimeTrigger doesn’t really
>>> look like that’s what’s going on.
>>>
>>> Can anyone clear up how these concepts interact? Is there a good model
>>> for this “session end event” concept that I can take a look at?
>>>
>>> Thanks,
>>>
>>> Andrew
>>> --
>>> *Confidentiality Notice: The information contained in this e-mail and any
>>>
>>> attachments may be confidential. If you are not an intended recipient,
>>> you
>>>
>>> are hereby notified that any dissemination, distribution or copying of
>>> this
>>>
>>> e-mail is strictly prohibited. If you have received this e-mail in error,
>>>
>>> please notify the sender and permanently delete the e-mail and any
>>>
>>> attachments immediately. You should not retain, copy or use this e-mail
>>> or
>>>
>>> any attachment for any purpose, nor disclose all or any part of the
>>>
>>> contents to any other person. Thank you.*
>>>
>>
>>
>> *Confidentiality Notice: The information contained in this e-mail and any
>> attachments may be confidential. If you are not an intended recipient, you
>> are hereby notified that any dissemination, distribution or copying of
>> this
>> e-mail is strictly prohibited. If you have received this e-mail in error,
>> please notify the sender and permanently delete the e-mail and 

Re: SinkFunction.Context

2019-02-21 Thread Rong Rong
Hi Durga,

1. currentProcessingTime: refers to this operator(SinkFunction)'s system
time at the moment of invoke
1a. the time you are referring to as "flink window got the message" is the
currentProcessingTime() invoked at the window operator (which provided by
the WindowContext similar to this one [1])
2 currentWatermark: refers to the current watermark [2] received by this
operator(SinkFunction)
3. timestamp: is actually the input record's event-time (this "input" is
referring to the input to the SinkFunction, not to the entire Flink
topology)

Hope these help.

--
Rong

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html#processwindowfunction
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#event-time-and-watermarks

On Thu, Feb 21, 2019 at 4:59 PM Durga Durga  wrote:

>
> HI Folks,
>
> Was following the documentation for
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.Context.html
>
>
>
> long currentProcessingTime
> 
> ()
> Returns the current processing time.
> long currentWatermark
> 
> ()
> Returns the current event-time watermark.
> Long
> 
> timestamp
> 
> ()
> Returns the timestamp of the current input record or null if the element
> does not have an assigned timestamp.
>
> - CurrentProcessing time - is this is the Event Time ? i.e the time when
> the Event Occured ? (or) when the flink window got the message ?.
>
> - timeStamp - is this the time the record is persisted in to the sync ?.
> (or) the aggregated data timestamp ?. Say if I have 100 records in my  time
> window - which time comes in to affect.
>
> - CurrentWaterMark - which time is this - the time the even occured - what
> will this value be - when there are 1000 records in my time window
>
> PS - We want to use some ID ( timestamp ) and associate with  all the
> records that are persisted (aggregated) in a given time window - i.e if
> there are 1000 records aggregated - and they resulted in 10 aggregated
> records - we want to give these 10 aggregated records the same ID and we
> want to use one of the above time stamp
>
> Thanks much.
>


Re: Metrics for number of "open windows"?

2019-02-21 Thread Rong Rong
Hi Andrew,

I am assuming you are actually using customized windowAssigner, trigger and
process function.
I think the best way for you to keep in-flight, not-yet-triggered windows
is to emit metrics in these 3 pieces.

Upon looking at the window operator, I don't think there's a a metrics
(guage) that keeps how many windows are not yet fired.
This information is available in the KeyedStateBackend, but I dont think
the KeyedStateBackend is emitting any metrics related to what you want.

Thanks,
Rong

On Tue, Feb 19, 2019 at 12:14 PM Andrew Roberts  wrote:

> Hello,
>
> I’m trying to track the number of currently-in-state windows in a keyed,
> windowed stream (stream.keyBy(…).window(…).trigger(…).process(…)) using
> Flink metrics. Are there any built in? Or any good approaches for
> collecting this data?
>
> Thanks,
>
> Andrew
> --
> *Confidentiality Notice: The information contained in this e-mail and any
>
> attachments may be confidential. If you are not an intended recipient, you
>
> are hereby notified that any dissemination, distribution or copying of this
>
> e-mail is strictly prohibited. If you have received this e-mail in error,
>
> please notify the sender and permanently delete the e-mail and any
>
> attachments immediately. You should not retain, copy or use this e-mail or
>
> any attachment for any purpose, nor disclose all or any part of the
>
> contents to any other person. Thank you.*
>


Re: [DISCUSS] Adding a mid-term roadmap to the Flink website

2019-02-21 Thread Rong Rong
Hi Stephan,

Yes. I completely agree. Jincheng & Jark gave some very valuable feedbacks
and suggestions and I think we can definitely move the conversation forward
to reach a more concrete doc first before we put in to the roadmap. Thanks
for reviewing it and driving the roadmap effort!

--
Rong

On Thu, Feb 21, 2019 at 8:50 AM Stephan Ewen  wrote:

> Hi Rong Rong!
>
> I would add the security / kerberos threads to the roadmap. They seem to
> be advanced enough in the discussions so that there is clarity what will
> come.
>
> For the window operator with slicing, I would personally like to see the
> discussion advance and have some more clarity and consensus on the feature
> before adding it to the roadmap. Not having that in the first version of
> the roadmap does not mean there will be no activity. And when the
> discussion advances well in the next weeks, we can update the roadmap soon.
>
> What do you think?
>
> Best,
> Stephan
>
>
> On Thu, Feb 14, 2019 at 5:46 PM Rong Rong  wrote:
>
>> Hi Stephan,
>>
>> Thanks for the clarification, yes I think these issues has already been
>> discussed in previous mailing list threads [1,2,3].
>>
>> I also agree that updating the "official" roadmap every release is a very
>> good idea to avoid frequent update.
>> One question I might've been a bit confusion is: are we suggesting to
>> keep one roadmap on the documentation site (e.g. [4]) per release, or
>> simply just one most up-to-date roadmap in the main website [5] ?
>> Just like the release notes in every release, the former will probably
>> provide a good tracker for users to look back at previous roadmaps as well
>> I am assuming.
>>
>> Thanks,
>> Rong
>>
>> [1]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-td25750.html
>> [2]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-security-improvements-td21068.html
>> [3]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Kerberos-Improvement-td25983.html
>>
>> [4] https://ci.apache.org/projects/flink/flink-docs-release-1.7/
>> [5] https://flink.apache.org/
>>
>> On Thu, Feb 14, 2019 at 2:26 AM Stephan Ewen  wrote:
>>
>>> I think the website is better as well.
>>>
>>> I agree with Fabian that the wiki is not so visible, and visibility is
>>> the main motivation.
>>> This type of roadmap overview would not be updated by everyone - letting
>>> committers update the roadmap means the listed threads are actually
>>> happening at the moment.
>>>
>>>
>>> On Thu, Feb 14, 2019 at 11:14 AM Fabian Hueske 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I like the idea of putting the roadmap on the website because it is
>>>> much more visible (and IMO more credible, obligatory) there.
>>>> However, I share the concerns about frequent updates.
>>>>
>>>> It think it would be great to update the "official" roadmap on the
>>>> website once per release (-bugfix releases), i.e., every three month.
>>>> We can use the wiki to collect and draft the roadmap for the next
>>>> update.
>>>>
>>>> Best, Fabian
>>>>
>>>>
>>>> Am Do., 14. Feb. 2019 um 11:03 Uhr schrieb Jeff Zhang >>> >:
>>>>
>>>>> Hi Stephan,
>>>>>
>>>>> Thanks for this proposal. It is a good idea to track the roadmap. One
>>>>> suggestion is that it might be better to put it into wiki page first.
>>>>> Because it is easier to update the roadmap on wiki compared to on flink 
>>>>> web
>>>>> site. And I guess we may need to update the roadmap very often at the
>>>>> beginning as there's so many discussions and proposals in community
>>>>> recently. We can move it into flink web site later when we feel it could 
>>>>> be
>>>>> nailed down.
>>>>>
>>>>> Stephan Ewen  于2019年2月14日周四 下午5:44写道:
>>>>>
>>>>>> Thanks Jincheng and Rong Rong!
>>>>>>
>>>>>> I am not deciding a roadmap and making a call on what features should
>>>>>> be developed or not. I was only collecting broader issues that are 
>>>>>> already
>>>>>> happening or have an active FLIP/design discussion plus committer 
>>>>>> support.
>>>

Re: Impact of occasional big pauses in stream processing

2019-02-14 Thread Rong Rong
Hi Ajay,

Yes, Andrey is right. I was actually missing the first basic but important
point: If your process function is stuck, it will immediately block that
thread.
>From your description, what it sounds like is that not all the messages you
consume from kafka actually triggers the processing logic. There should be
plenty of way to avoid over provisioning your job just to satisfy your peak
traffic: for example what Andrey suggested, using a Async RPC call to some
other resource for the heavy computation; or split it into a filter (which
removes non-actionable messages) and the actual process (where you can use
a higher parallelism to reduce chances of stuck).

Regarding the second question, I am not expert but my understanding is that
there should be isolations between Flink jobs you run on that session
cluster. e.g. one job's backpressure will not affect other jobs' consumer.
I've CCed Till who might be able to better answer your question.

--
Rong


On Thu, Feb 14, 2019 at 8:24 AM Aggarwal, Ajay 
wrote:

> Thank you Rong and Andrey. The blog and your explanation was very useful.
>
>
>
> In my use case, source stream (kafka based) contains messages that capture
> some “work” that needs to be done for a tenant.  It’s a multi-tenant source
> stream. I need to queue up (and execute) this work per tenant in the order
> in which it was produced. And flink provides this ordered queuing per
> tenant very elegantly. Now the only thing is that executing this “work”
> could be expensive in terms of compute/memory/time.  Furthermore per tenant
> there is a constraint of doing this work serially. Hence this question.  I
> believe if our flink cluster has enough resources, it should work.
>
>
>
> But this leads to another related question. If there are multiple flink
> jobs sharing the same flink cluster and one of those jobs sees the spike
> such that back pressure builds up all the way to the source, will that
> impact other jobs as well? Is a task slot shared by multiple jobs? If not,
> my understanding is that this should not impact other flink jobs. Is that
> correct?
>
>
>
> Thanks.
>
>
>
> Ajay
>
>
>
> *From: *Andrey Zagrebin 
> *Date: *Thursday, February 14, 2019 at 5:09 AM
> *To: *Rong Rong 
> *Cc: *"Aggarwal, Ajay" , "user@flink.apache.org"
> 
> *Subject: *Re: Impact of occasional big pauses in stream processing
>
>
>
> Hi Ajay,
>
>
>
> Technically, it will immediately block the thread of
> MyKeyedProcessFunction subtask scheduled to some slot and basically block
> processing of the key range assigned to this subtask.
> Practically, I agree with Rong's answer. Depending on the topology of your
> inputStream, it can eventually block a lot of stuff.
> In general, I think, it is not recommended to perform blocking operations
> in process record functions. You could consider AsyncIO [1] to unblock the
> task thread.
>
> Best,
>
> Andrey
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>
>
>
> On Thu, Feb 14, 2019 at 6:03 AM Rong Rong  wrote:
>
> Hi Ajay,
>
>
>
> Flink handles "backpressure" in a graceful way so that it doesn't get
> affected when your processing pipeline is occasionally slowed down.
>
> I think the following articles will help [1,2].
>
>
>
> In your specific case: the "KeyBy" operation will re-hash data so they can
> be reshuffled from all input consumers to all your process operators (in
> this case the MyKeyedProcessFunction). If one of the process operator is
> backpressured, it will back track all the way to the source.
>
> So, my understanding is that: since there's the reshuffling, if one of the
> process function is backpressured, it will potentially affect all the
> source operators.
>
>
>
> Thanks,
>
> Rong
>
>
>
> [1] https://www.ververica.com/blog/how-flink-handles-backpressure
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html
>
>
>
> On Wed, Feb 13, 2019 at 8:50 AM Aggarwal, Ajay 
> wrote:
>
> I was wondering what is the impact if one of the stream operator function
> occasionally takes too long to process the event.  Given the following
> simple flink job
>
>
>
>inputStream
>
>   .KeyBy (“tenantId”)
>
>   .process ( new MyKeyedProcessFunction())
>
>
>
> , if occasionally MyKeyedProcessFunction takes too long (say ~5-10
> minutes) to process an incoming element, what is the impact on overall
> pipeline? Is the impact limited to
>
>1. Specific key for which MyKeyedProcessFunction is currently taking
>too long to process an element, or
>2. Specific Taskslot, where MyKeyedProcessFunction is currently taking
>too long to process an element, i.e. impacting multiple keys, or
>3. Entire inputstream ?
>
>
>
> Also what is the built in resiliency in these cases? Is there a concept of
> timeout for each operator function?
>
>
>
> Ajay
>
>


Re: [DISCUSS] Adding a mid-term roadmap to the Flink website

2019-02-14 Thread Rong Rong
Hi Stephan,

Thanks for the clarification, yes I think these issues has already been
discussed in previous mailing list threads [1,2,3].

I also agree that updating the "official" roadmap every release is a very
good idea to avoid frequent update.
One question I might've been a bit confusion is: are we suggesting to keep
one roadmap on the documentation site (e.g. [4]) per release, or simply
just one most up-to-date roadmap in the main website [5] ?
Just like the release notes in every release, the former will probably
provide a good tracker for users to look back at previous roadmaps as well
I am assuming.

Thanks,
Rong

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-td25750.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-security-improvements-td21068.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Kerberos-Improvement-td25983.html

[4] https://ci.apache.org/projects/flink/flink-docs-release-1.7/
[5] https://flink.apache.org/

On Thu, Feb 14, 2019 at 2:26 AM Stephan Ewen  wrote:

> I think the website is better as well.
>
> I agree with Fabian that the wiki is not so visible, and visibility is the
> main motivation.
> This type of roadmap overview would not be updated by everyone - letting
> committers update the roadmap means the listed threads are actually
> happening at the moment.
>
>
> On Thu, Feb 14, 2019 at 11:14 AM Fabian Hueske  wrote:
>
>> Hi,
>>
>> I like the idea of putting the roadmap on the website because it is much
>> more visible (and IMO more credible, obligatory) there.
>> However, I share the concerns about frequent updates.
>>
>> It think it would be great to update the "official" roadmap on the
>> website once per release (-bugfix releases), i.e., every three month.
>> We can use the wiki to collect and draft the roadmap for the next update.
>>
>> Best, Fabian
>>
>>
>> Am Do., 14. Feb. 2019 um 11:03 Uhr schrieb Jeff Zhang :
>>
>>> Hi Stephan,
>>>
>>> Thanks for this proposal. It is a good idea to track the roadmap. One
>>> suggestion is that it might be better to put it into wiki page first.
>>> Because it is easier to update the roadmap on wiki compared to on flink web
>>> site. And I guess we may need to update the roadmap very often at the
>>> beginning as there's so many discussions and proposals in community
>>> recently. We can move it into flink web site later when we feel it could be
>>> nailed down.
>>>
>>> Stephan Ewen  于2019年2月14日周四 下午5:44写道:
>>>
>>>> Thanks Jincheng and Rong Rong!
>>>>
>>>> I am not deciding a roadmap and making a call on what features should
>>>> be developed or not. I was only collecting broader issues that are already
>>>> happening or have an active FLIP/design discussion plus committer support.
>>>>
>>>> Do we have that for the suggested issues as well? If yes , we can add
>>>> them (can you point me to the issue/mail-thread), if not, let's try and
>>>> move the discussion forward and add them to the roadmap overview then.
>>>>
>>>> Best,
>>>> Stephan
>>>>
>>>>
>>>> On Wed, Feb 13, 2019 at 6:47 PM Rong Rong  wrote:
>>>>
>>>>> Thanks Stephan for the great proposal.
>>>>>
>>>>> This would not only be beneficial for new users but also for
>>>>> contributors to keep track on all upcoming features.
>>>>>
>>>>> I think that better window operator support can also be separately
>>>>> group into its own category, as they affects both future DataStream API 
>>>>> and
>>>>> batch stream unification.
>>>>> can we also include:
>>>>> - OVER aggregate for DataStream API separately as @jincheng suggested.
>>>>> - Improving sliding window operator [1]
>>>>>
>>>>> One more additional suggestion, can we also include a more extendable
>>>>> security module [2,3] @shuyi and I are currently working on?
>>>>> This will significantly improve the usability for Flink in corporate
>>>>> environments where proprietary or 3rd-party security integration is 
>>>>> needed.
>>>>>
>>>>> Thanks,
>>>>> Rong
>>>>>
>>>>>
>>>>> [1]
>>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvem

Re: Impact of occasional big pauses in stream processing

2019-02-13 Thread Rong Rong
Hi Ajay,

Flink handles "backpressure" in a graceful way so that it doesn't get
affected when your processing pipeline is occasionally slowed down.
I think the following articles will help [1,2].

In your specific case: the "KeyBy" operation will re-hash data so they can
be reshuffled from all input consumers to all your process operators (in
this case the MyKeyedProcessFunction). If one of the process operator is
backpressured, it will back track all the way to the source.
So, my understanding is that: since there's the reshuffling, if one of the
process function is backpressured, it will potentially affect all the
source operators.

Thanks,
Rong

[1] https://www.ververica.com/blog/how-flink-handles-backpressure
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html

On Wed, Feb 13, 2019 at 8:50 AM Aggarwal, Ajay 
wrote:

> I was wondering what is the impact if one of the stream operator function
> occasionally takes too long to process the event.  Given the following
> simple flink job
>
>
>
>inputStream
>
>   .KeyBy (“tenantId”)
>
>   .process ( new MyKeyedProcessFunction())
>
>
>
> , if occasionally MyKeyedProcessFunction takes too long (say ~5-10
> minutes) to process an incoming element, what is the impact on overall
> pipeline? Is the impact limited to
>
>1. Specific key for which MyKeyedProcessFunction is currently taking
>too long to process an element, or
>2. Specific Taskslot, where MyKeyedProcessFunction is currently taking
>too long to process an element, i.e. impacting multiple keys, or
>3. Entire inputstream ?
>
>
>
> Also what is the built in resiliency in these cases? Is there a concept of
> timeout for each operator function?
>
>
>
> Ajay
>


Re: [DISCUSS] Adding a mid-term roadmap to the Flink website

2019-02-13 Thread Rong Rong
Thanks Stephan for the great proposal.

This would not only be beneficial for new users but also for contributors
to keep track on all upcoming features.

I think that better window operator support can also be separately group
into its own category, as they affects both future DataStream API and batch
stream unification.
can we also include:
- OVER aggregate for DataStream API separately as @jincheng suggested.
- Improving sliding window operator [1]

One more additional suggestion, can we also include a more extendable
security module [2,3] @shuyi and I are currently working on?
This will significantly improve the usability for Flink in corporate
environments where proprietary or 3rd-party security integration is needed.

Thanks,
Rong


[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improvement-to-Flink-Window-Operator-with-Slicing-td25750.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-security-improvements-td21068.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-Kerberos-Improvement-td25983.html




On Wed, Feb 13, 2019 at 3:39 AM jincheng sun 
wrote:

> Very excited and thank you for launching such a great discussion, Stephan !
>
> Here only a little suggestion that in the Batch Streaming Unification
> section, do we need to add an item:
>
> - Same window operators on bounded/unbounded Table API and DataStream API
> (currently OVER window only exists in SQL/TableAPI, DataStream API does
> not yet support)
>
> Best,
> Jincheng
>
> Stephan Ewen  于2019年2月13日周三 下午7:21写道:
>
>> Hi all!
>>
>> Recently several contributors, committers, and users asked about making
>> it more visible in which way the project is currently going.
>>
>> Users and developers can track the direction by following the discussion
>> threads and JIRA, but due to the mass of discussions and open issues, it is
>> very hard to get a good overall picture.
>> Especially for new users and contributors, is is very hard to get a quick
>> overview of the project direction.
>>
>> To fix this, I suggest to add a brief roadmap summary to the homepage. It
>> is a bit of a commitment to keep that roadmap up to date, but I think the
>> benefit for users justifies that.
>> The Apache Beam project has added such a roadmap [1]
>> , which was received very well by the
>> community, I would suggest to follow a similar structure here.
>>
>> If the community is in favor of this, I would volunteer to write a first
>> version of such a roadmap. The points I would include are below.
>>
>> Best,
>> Stephan
>>
>> [1] https://beam.apache.org/roadmap/
>>
>> 
>>
>> Disclaimer: Apache Flink is not governed or steered by any one single
>> entity, but by its community and Project Management Committee (PMC). This
>> is not a authoritative roadmap in the sense of a plan with a specific
>> timeline. Instead, we share our vision for the future and major initiatives
>> that are receiving attention and give users and contributors an
>> understanding what they can look forward to.
>>
>> *Future Role of Table API and DataStream API*
>>   - Table API becomes first class citizen
>>   - Table API becomes primary API for analytics use cases
>>   * Declarative, automatic optimizations
>>   * No manual control over state and timers
>>   - DataStream API becomes primary API for applications and data pipeline
>> use cases
>>   * Physical, user controls data types, no magic or optimizer
>>   * Explicit control over state and time
>>
>> *Batch Streaming Unification*
>>   - Table API unification (environments) (FLIP-32)
>>   - New unified source interface (FLIP-27)
>>   - Runtime operator unification & code reuse between DataStream / Table
>>   - Extending Table API to make it convenient API for all analytical use
>> cases (easier mix in of UDFs)
>>   - Same join operators on bounded/unbounded Table API and DataStream API
>>
>> *Faster Batch (Bounded Streams)*
>>   - Much of this comes via Blink contribution/merging
>>   - Fine-grained Fault Tolerance on bounded data (Table API)
>>   - Batch Scheduling on bounded data (Table API)
>>   - External Shuffle Services Support on bounded streams
>>   - Caching of intermediate results on bounded data (Table API)
>>   - Extending DataStream API to explicitly model bounded streams (API
>> breaking)
>>   - Add fine fault tolerance, scheduling, caching also to DataStream API
>>
>> *Streaming State Evolution*
>>   - Let all built-in serializers support stable evolution
>>   - First class support for other evolvable formats (Protobuf, Thrift)
>>   - Savepoint input/output format to modify / adjust savepoints
>>
>> *Simpler Event Time Handling*
>>   - Event Time Alignment in Sources
>>   - Simpler out-of-the box support in sources
>>
>> *Checkpointing*
>>   - Consistency of Side Effects: suspend / end with savepoint (FLIP-34)
>>   - Failed 

Re: Is there a windowing strategy that allows a different offset per key?

2019-02-11 Thread Rong Rong
getKey(IN value)Hi Stephen,

Yes, we had a discussion regarding for dynamic offsets and keys [1]. The
main idea was the same: we don't have many complex operators after the
window operator, thus a huge spike of traffic will occur after firing on
the window boundary. In the discussion the best idea is to override with a
special *WindowAssigner*, in which you can customize the offset strategy.

The only thing is that the *KeySelector* you use before windowing have to
be stateless (e.g. every invoke of *getKey(IN value)* function with same
input value should return identical result). In your case, if the id field
is used to determine the offset, you can always do that by extracting id
from the key tuple of (id, path).

Hope these helps.

Thanks,
Rong


[1]
https://docs.google.com/document/d/1fEhbcRgxxX8zFYD_iMBG1DCbHmTcTRfRQFXelPhMFiY/edit?usp=sharing

On Mon, Feb 11, 2019 at 2:20 AM Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

>
>
> On Mon, 11 Feb 2019 at 09:54, Fabian Hueske  wrote:
>
>> Hi Stephen,
>>
>> First of all, yes, windows computing and emitting at the same time can
>> cause pressure on the downstream system.
>>
>> There are a few ways how you can achieve this:
>> * use a custom window assigner. A window assigner decides into which
>> window a record is assigned. This is the approach you suggested.
>>
>
> Thanks for the link. Yes I think the custom window assigner is most
> certainly the way to go for my use case. Even more specifically because the
> offsets I want to use are going to be based on a subset of the assigned key
> not the full assigned key (if you see my other mails this week, the key I
> window is a composite key of (id,path) but I want to have all the offsets
> for any specific id be the same, irrespective of the path, so the
> theoretical need of access to the full key that was driving Rong's original
> idea for an RFE to the WindowAssignerContext is not even necessary for my
> case)
>
>
>> * use a regular window and add an operator that buffers the window
>> results and releases them with randomized delay.
>> * use a ProcessFunction which allows you to control the timing of
>> computations yourself.
>>
>> A few months ago, there was a similar discussion on the dev mailing list
>> [1] (didn't read the thread) started by Rong (in CC).
>> Maybe, he can share some ideas / experiences as well.
>>
>
> Would be awesome if Rong can share any learnings he has encountered since
>
>
>>
>> Cheers,
>> Fabian
>>
>> [1]
>> https://lists.apache.org/thread.html/0d1e41302b89378f88693bf4fdb52c23d4b240160b5a10c163d9c46c@%3Cdev.flink.apache.org%3E
>>
>>
>> Am So., 10. Feb. 2019 um 21:03 Uhr schrieb Stephen Connolly <
>> stephen.alan.conno...@gmail.com>:
>>
>>> Looking into the code in TumblingEventTimeWindows:
>>>
>>> @Override
>>> public Collection assignWindows(Object element, long
>>> timestamp, WindowAssignerContext context) {
>>> if (timestamp > Long.MIN_VALUE) {
>>> // Long.MIN_VALUE is currently assigned when no timestamp is present
>>> long start = TimeWindow.getWindowStartWithOffset(timestamp, offset,
>>> size);
>>> return Collections.singletonList(new TimeWindow(start, start + size));
>>> } else {
>>> throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no
>>> timestamp marker). " +
>>> "Is the time characteristic set to 'ProcessingTime', or did you forget
>>> to call " +
>>> "'DataStream.assignTimestampsAndWatermarks(...)'?");
>>> }
>>> }
>>>
>>> So I think I can just write my own where the offset is derived from
>>> hashing the element using my hash function.
>>>
>>> Good plan or bad plan?
>>>
>>>
>>> On Sun, 10 Feb 2019 at 19:55, Stephen Connolly <
>>> stephen.alan.conno...@gmail.com> wrote:
>>>
 I would like to process a stream of data firom different customers,
 producing output say once every 15 minutes. The results will then be loaded
 into another system for stoage and querying.

 I have been using TumblingEventTimeWindows in my prototype, but I am
 concerned that all the windows will start and stop at the same time and
 cause batch load effects on the back-end data store.

 What I think I would like is that the windows could have a different
 start offset for each key, (using a hash function that I would supply)

 Thus deterministically, key "ca:fe:ba:be" would always start based on
 an initail offset of 00:07 UTC while say key "de:ad:be:ef" would always
 start based on an initial offset of say 00:02 UTC

 Is this possible? Or do I just have to find some way of queuing up my
 writes using back-pressure?

 Thanks in advance

 -stephenc

 P.S. I can trade assistance with Flink for assistance with Maven or
 Jenkins if my questions are too wierysome!

>>>


Re: Can an Aggregate the key from a WindowedStream.aggregate()

2019-02-11 Thread Rong Rong
Hi Stephen,

Chesney was right, you will have to use a more complex version of the
window processing function.
Perhaps your goal can be achieve by this specific function with incremental
aggregation [1]. If not you can always use the regular process window
function [2].
Both of these methods have access to the KEY information you required.

Thanks,
Rong

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/windows.html#processwindowfunction



On Sun, Feb 10, 2019 at 11:29 AM Stephen Connolly <
stephen.alan.conno...@gmail.com> wrote:

>
>
> On Sun, 10 Feb 2019 at 09:48, Chesnay Schepler  wrote:
>
>> There are also versions of WindowedStream#aggregate that accept an
>> additional WindowFunction/ProcessWindowFunction, which do have access to
>> the key via apply()/process() respectively. These functions are called
>> post aggregation.
>>
>
> Cool I'll chase those down
>
>
>>
>> On 08.02.2019 18:24, stephen.alan.conno...@gmail.com wrote:
>> > If I write my aggregation logic as a WindowFunction then I get access
>> to the key as the first parameter in WindowFunction.apply(...) however the
>> Javadocs for calling WindowedStream.apply(WindowFunction) state:
>> >
>> >> Note that this function requires that all data in the windows is
>> buffered until the window
>> >> is evaluated, as the function provides no means of incremental
>> aggregation.
>> > Which sounds bad.
>> >
>> > It seems the recommended alternative is to use one of the
>> WindowFunction.aggregate(AggregateFunction) however I cannot see how to get
>> access to the key...
>> >
>> > Is my only solution to transform my data into a Tuple if I need access
>> to the key post aggregation?
>> >
>> > Thanks in advance
>> >
>> > -stephenc
>> >
>>
>>


Re: Get nested Rows from Json string

2019-02-08 Thread Rong Rong
Hi François,

I just did some research and seems like this is in fact a Stringify issue.
If you try running one of the AvroRowDeSerializationSchemaTest [1],
you will find out that only MAP, ARRAY are correctly stringify (Map using
"{}" quote and Array using "[]" quote).
However nested records are not quoted using "()".

Wasn't sure if this is consider as a bug for the toString method of the
type Row. I just filed a JIRA [2] for this issue, feel free to comment on
the discussion.

--
Rong

[1]
https://github.com/apache/flink/blob/release-1.7/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroRowDeSerializationSchemaTest.java
[2] https://issues.apache.org/jira/browse/FLINK-11569

On Fri, Feb 8, 2019 at 8:51 AM françois lacombe <
francois.laco...@dcbrain.com> wrote:

> Hi Rong,
>
> Thank you for this answer.
> I've changed Rows to Map, which ease the conversion process.
>
> Nevertheless I'm interested in any explanation about why row1.setField(i,
> row2) appeends row2 at the end of row1.
>
> All the best
>
> François
>
> Le mer. 6 févr. 2019 à 19:33, Rong Rong  a écrit :
>
>> Hi François,
>>
>> I wasn't exactly sure this is a JSON object or JSON string you are trying
>> to process.
>> For a JSON string this [1] article might help.
>> For a JSON object, I am assuming you are trying to convert it into a
>> TableSource and processing using Table/SQL API, you could probably use the
>> example here [2]
>>
>> BTW, a very remote hunch, this might be just a stringify issue how you
>> print the row out.
>>
>> --
>> Rong
>>
>> [1]:
>> https://stackoverflow.com/questions/49380778/how-to-stream-a-json-using-flink
>> [2]:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/sourceSinks.html#table-sources-sinks
>>
>> On Wed, Feb 6, 2019 at 3:06 AM françois lacombe <
>> francois.laco...@dcbrain.com> wrote:
>>
>>> Hi all,
>>>
>>> I currently get a json string from my pgsql source with nested objects
>>> to be converted into Flink's Row.
>>> Nested json objects should go in nested Rows.
>>> An avro schema rules the structure my source should conform to.
>>>
>>> According to this json :
>>> {
>>>   "a":"b",
>>>   "c":"d",
>>>   "e":{
>>>"f":"g"
>>>}
>>> }
>>>
>>> ("b", "d", Row("g")) is expected as a result according to my avro schema.
>>>
>>> I wrote a recursive method which iterate over json objects and put
>>> nested Rows at right indices in their parent but here is what outputs :
>>> ("b", "d", "g")
>>> Child Row is appended to the parent. I don't understand why.
>>> Obviously, process is crashing arguing the top level Row arity doesn't
>>> match serializers.
>>>
>>> Is there some native methods in Flink to achieve that?
>>> I don't feel so comfortable to have written my own json processor for
>>> this job.
>>>
>>> Do you have any hint which can help please ?
>>>
>>> All the best
>>>
>>> François
>>>
>>>
>>>
>>> <http://www.dcbrain.com/>   <https://twitter.com/dcbrain_feed?lang=fr>
>>> <https://www.linkedin.com/company/dcbrain>
>>> <https://www.youtube.com/channel/UCSJrWPBLQ58fHPN8lP_SEGw>
>>>
>>> [image: Arbre vert.jpg] Pensez à la planète, imprimer ce papier que si
>>> nécessaire
>>>
>>
>
> <http://www.dcbrain.com/>   <https://twitter.com/dcbrain_feed?lang=fr>
> <https://www.linkedin.com/company/dcbrain>
> <https://www.youtube.com/channel/UCSJrWPBLQ58fHPN8lP_SEGw>
>
> [image: Arbre vert.jpg] Pensez à la planète, imprimer ce papier que si
> nécessaire
>


Re: Using custom evictor and trigger on Table API

2019-02-06 Thread Rong Rong
Hi Dongwon,

There was a previous thread regarding this[1], unfortunately this is not
supported yet.

However there are some latest development proposal[2,3] to enhance the
TableAPI which might be able to support your use case.

--
Rong

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Table-API-custom-window-td22199.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Enhancing-the-functionality-and-productivity-of-Table-API-td24963i20.html

On Wed, Feb 6, 2019 at 4:14 AM eastcirclek  wrote:

> Hi all,
>
> I’m looking into Table API for my new project.
> It looks like a sweet spot between DataStream API/SQL.
>
> However, it doesn’t seem like the expressivity of Table API equals to that
> of DataStream API.
>
> My previous Flink projects were building simple pipelines using DataStream
> API with custom evictor (FF Berlin 17) and custom trigger (FF Berlin 18).
> I believe these pipelines can be expressed with Table API/UDF/UDAF except
> the custom windowing components.
>
> Do I have no way but to change the table into DataStream to use the custom
> components at the moment?
>
> Best,
> - Dongwon
>
>


Re: Get nested Rows from Json string

2019-02-06 Thread Rong Rong
Hi François,

I wasn't exactly sure this is a JSON object or JSON string you are trying
to process.
For a JSON string this [1] article might help.
For a JSON object, I am assuming you are trying to convert it into a
TableSource and processing using Table/SQL API, you could probably use the
example here [2]

BTW, a very remote hunch, this might be just a stringify issue how you
print the row out.

--
Rong

[1]:
https://stackoverflow.com/questions/49380778/how-to-stream-a-json-using-flink
[2]:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/sourceSinks.html#table-sources-sinks

On Wed, Feb 6, 2019 at 3:06 AM françois lacombe <
francois.laco...@dcbrain.com> wrote:

> Hi all,
>
> I currently get a json string from my pgsql source with nested objects to
> be converted into Flink's Row.
> Nested json objects should go in nested Rows.
> An avro schema rules the structure my source should conform to.
>
> According to this json :
> {
>   "a":"b",
>   "c":"d",
>   "e":{
>"f":"g"
>}
> }
>
> ("b", "d", Row("g")) is expected as a result according to my avro schema.
>
> I wrote a recursive method which iterate over json objects and put nested
> Rows at right indices in their parent but here is what outputs : ("b", "d",
> "g")
> Child Row is appended to the parent. I don't understand why.
> Obviously, process is crashing arguing the top level Row arity doesn't
> match serializers.
>
> Is there some native methods in Flink to achieve that?
> I don't feel so comfortable to have written my own json processor for this
> job.
>
> Do you have any hint which can help please ?
>
> All the best
>
> François
>
>
>
>    
> 
> 
>
> [image: Arbre vert.jpg] Pensez à la planète, imprimer ce papier que si
> nécessaire
>


Re: TimeZone shift problem in Flink SQL

2019-01-25 Thread Rong Rong
Hi Henry,

Unix epoch time values are always under GMT timezone, for example:
- 1548162182001 <=> GMT: Tuesday, January 22, 2019 1:03:02.001 PM, or CST:
Tuesday, January 22, 2019 9:03:02.001 PM.
- 1548190982001 <=> GMT: Tuesday, January 22, 2019 9:03:02.001 PM, or CST:
Wednesday, January 23, 2019 4:03:02.001 AM.

several things are needed here
1. your "unix_timestamp" UDF should return actual Unix epoch time [1].
2. as Bowen mentioned, you will have to pass in the desired timezone as
argument to your "from_unixtime" UDF.

--
Rong

[1]: https://en.wikipedia.org/wiki/Unix_time

On Thu, Jan 24, 2019 at 4:43 PM Bowen Li  wrote:

> Hi,
>
> Did you consider timezone in conversion in your UDF?
>
>
> On Tue, Jan 22, 2019 at 5:29 AM 徐涛  wrote:
>
>> Hi Experts,
>> I have the following two UDFs,
>> unix_timestamp:   transform from string to Timestamp, with the
>> arguments (value:String, format:String), return Timestamp
>>from_unixtime:transform from Timestamp to String, with the
>> arguments (ts:Long, format:String), return String
>>
>>
>> select
>>  number,
>>  ts,
>>  from_unixtime(unix_timestamp(LAST_UPDATE_TIME, 'EEE MMM dd
>> HH:mm:Ss z '),'-MM-dd')  as dt
>>   from
>>  test;
>>
>>  when the LAST_UPDATE_TIME value is "Tue Jan 22 21:03:12 CST 2019”,
>> the unix_timestamp return a Timestamp with value 1548162182001.
>>   but when from_unixtime is invoked, the timestamp with
>> value 1548190982001 is passed in, there are 8 hours shift between them.
>>   May I know why there are 8 hours shift between them, and how can I
>> get the timestamp that are passed out originally from the first UDF without
>> changing the code?
>>   Thanks very much.
>>
>> Best
>> Henry
>>
>


Re: ElasticSearch RestClient throws NoSuchMethodError due to shade mechanism

2019-01-15 Thread Rong Rong
Hi Henry,

I was not sure if this is the suggested way. but from what I understand of
the pom file in elasticsearch5, you are allowed to change the sub version
of the org.ealisticsearch.client via manually override using

-Delasticsearch.version=5.x.x

during maven build progress if you are using a different sub version.
This way you don't need to include 2 jars of the elasticsearch client. Does
this resolves your problem?

--
Rong

On Tue, Jan 15, 2019 at 2:39 AM 徐涛  wrote:

> Hi All,
> I use the following code try to build a RestClient
> org.elasticsearch.client.RestClient.builder(  new HttpHost(xxx,
> xxx,"http")  ).build()
> but when in running time, a NoSuchMethodError throws out, I think the
> reason is:
> There are two RestClient classes, one is in the jar I include, the other
> one is in flink-connector-elasticsearch5, but the argument of build method
> in flink-connector-elasticsearch5 is
> org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.apache.http.HttpHost.
> So I want to know why org.elasticsearch.client.RestClientBuilder is not
> shaded, so runtime class conflict could be avoided?
>
>* public static RestClientBuilder
> builder(org.apache.flink.streaming.connectors.elasticsearch5.shaded.org.apache.http.HttpHost...
> hosts) {*
> *return new RestClientBuilder(hosts);*
> *}*
>
> Best
> Henry
>


Re: Is the eval method invoked in a thread safe manner in Fink UDF

2019-01-13 Thread Rong Rong
According to the codegen result, I think each field is invoked
sequentially.

However, if you maintain internal state within your UDF, it is your
responsibility to maintain the internal state consistency.
Are you invoking external RPC in your "GetName" UDF method and that has to
be async?

--
Rong

On Sat, Jan 12, 2019 at 11:42 PM Anil  wrote:

> Thanks Hequn!. Is it also thread safe when the same UDF is called multiple
> times in the same record.
> Is the UDF called sequentially for each fields a single record,  I have a
> query like  -
>  select GetName(data.id, 'city'), GetName(data.id, 'zone') from ..
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Java Exapmle of Stochastic Outlier Selection

2019-01-12 Thread Rong Rong
Hi James,

Usually Flink ML is highly integrated with Scala. I did poke around to and
try to make the example work in Java and it does require a significant
amount of effort, but you can try:

First the implicit type parameters needs to be passed over to the execution
environment to generate the DataSet appropriately. I found this link [1]
might be useful depending on your use case.
Next you need to extract the TransformDataSetOperation from
the StochasticOutlierSelection method since it's also an implicit argument
to the transform method.

--
Rong

[1]:
https://www.programcreek.com/java-api-examples/index.php?api=scala.reflect.ClassTag


On Sat, Jan 12, 2019 at 12:39 AM James.Y.Yang (g-mis.cncd02.Newegg) 42035 <
james.y.y...@newegg.com> wrote:

> Hi,
>
>
>
> I want to use Stochastic Outlier Selection in ML Library. But after I read
> the document [1] , I find there is not Java example. Sorry I am not
> familiar with Scala
>
> So I appreciate that someone can share a Java example.
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/libs/ml/sos.html
>
>
>
> Best Regards,
>
> James Yang
>


Re: Questions about UDTF in flink SQL

2018-11-30 Thread Rong Rong
Hi Wangsan,

If your require is essentially wha Jark describe, we already have a
proposal following up [FLINK-9249] in its related/parent task:
[FLINK-9484]. We are already implementing some of these internally and have
one PR ready for review for FLINK-9294.

Please kindly take a look and see if there's any additional features you
would like to comment and suggest.

Thanks,
Rong

On Fri, Nov 30, 2018 at 1:54 AM Jark Wu  wrote:

> Hi Wangsan,
>
> If I understand correctly, you want the return type of UDTF is determined
> by the actual arguments, not a fixed result type. For example:
>
> udtf("int, string, long", inputField)returns  a composite type with
> [f0: INT, f1: VARCHAR, f2: BIGINT]
> udtf("int", inputField)returns  an atomic type with [f0: INT]
>
> This is an interesting and useful feature IMO. But it maybe need some
> modification for the current API of TableFunction to
> provide an additional `TypeInformation[T] getResultType(Object[]
> arguments, Class[] argTypes)` interface. Which means need
> more discussion in the community.
>
> But you can create an issue if this is what you want and we can discuss
> how to support it.
>
> Best,
> Jark
>
>
>
> On Thu, 29 Nov 2018 at 19:14, Timo Walther  wrote:
>
>> Hi Wangsan,
>>
>> currently, UDFs have very strict result type assumptions. This is
>> necessary to determine the serializers for the cluster. There were
>> multiple requests for more flexible handling of types in UDFs.
>>
>> Please have a look at:
>> - [FLINK-7358] Add implicitly converts support for User-defined function
>> - [FLINK-9294] [table] Improve type inference for UDFs with composite
>> parameter and/or result type
>> - [FLINK-10958] [table] Add overload support for user defined function
>>
>> I you think those issues do not represent what you need. You can open a
>> new issue with a little example of what feature you think is missing.
>>
>> Regards,
>> Timo
>>
>>
>> Am 28.11.18 um 09:59 schrieb wangsan:
>> > Hi all,
>> >
>> > When using user-defined table function in Flink SQL, it seems that the
>> result type of a table function must be determinstic.
>> >
>> > If I want a UDTF whose result type is determined by its input
>> parameters, what should I do?
>> >
>> > What I want to do is like this:
>> >
>> > ```
>> > SELECT input, f1, f2 length FROM MyTable, LATERAL
>> TABLE(unnest_udtf(input, v1, v2)) as T(f1, f2), LATERAL
>> TABLE(unnest_udtf(input, v3, v4, v5)) as T(f3, f4, f5)
>> > ```
>> >
>> > I can surely register the same UDTF with different name and
>> configuration, but I guess that’s not a good idea :(.
>> >
>> > If we can not make this in Flink SQL for now , may be we should
>> consider this feature in future?
>> >
>> > Best,
>> > wangsan
>>
>>
>>


Re: Reset kafka offets to latest on restart

2018-11-21 Thread Rong Rong
Hi Vishal,

You can probably try using similar offset configuration as a service
consumer.
Maybe this will be useful to look at [1]

Thanks,
Rong

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration

On Wed, Nov 21, 2018 at 1:32 PM Jamie Grier  wrote:

> Hi Vishal,
>
> No, there is no way to do this currently.
>
>
> On Wed, Nov 21, 2018 at 10:22 AM Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> Any one ?
>>
>> On Tue, Nov 20, 2018 at 12:48 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Is it possible to have checkpointing but reset the kafka offsets to
>>> latest on restart on failure ?
>>>
>>


Re: [DISCUSS] Integrate Flink SQL well with Hive ecosystem

2018-10-11 Thread Rong Rong
Hi Xuefu,

Thanks for putting together the overview. I would like to add some more on
top of Timo's comments.
1,2. I agree with Timo that a proper catalog support should also address
the metadata compatibility issues. I was actually wondering if you are
referring to something like utilizing table stats for plan optimization?
4. If the key is to have users integrate Hive UDF without code changes to
Flink UDF, it shouldn't be a problem as Timo mentioned. Is your concern
mostly on the support of Hive UDFs that should be implemented in
Flink-table natively?
7,8. Correct me if I am wrong, but I feel like some of the related
components might have already been discussed in the longer term road map of
FLIP-24 [1]?
9. per Jorn's comment to stay clear from a tight dependency on Hive and
treat it as one "connector" system. Should we also consider treating
JDBC/ODBC driver as part of the component from the connector system instead
of having Flink to provide them?

Thanks,
Rong

[1]. https://cwiki.apache.org/confluence/display/FLINK/FLIP-24+-+SQL+Client

On Thu, Oct 11, 2018 at 12:46 AM Timo Walther  wrote:

> Hi Xuefu,
>
> thanks for your proposal, it is a nice summary. Here are my thoughts to
> your list:
>
> 1. I think this is also on our current mid-term roadmap. Flink lacks a
> poper catalog support for a very long time. Before we can connect
> catalogs we need to define how to map all the information from a catalog
> to Flink's representation. This is why the work on the unified connector
> API [1] is going on for quite some time as it is the first approach to
> discuss and represent the pure characteristics of connectors.
> 2. It would be helpful to figure out what is missing in [1] to to ensure
> this point. I guess we will need a new design document just for a proper
> Hive catalog integration.
> 3. This is already work in progress. ORC has been merged, Parquet is on
> its way [1].
> 4. This should be easy. There was a PR in past that I reviewed but was
> not maintained anymore.
> 5. The type system of Flink SQL is very flexible. Only UNION type is
> missing.
> 6. A Flink SQL DDL is on the roadmap soon once we are done with [1].
> Support for Hive syntax also needs cooperation with Apache Calcite.
> 7-11. Long-term goals.
>
> I would also propose to start with a smaller scope where also current
> Flink SQL users can profit: 1, 2, 5, 3. This would allow to grow the
> Flink SQL ecosystem. After that we can aim to be fully compatible
> including syntax and UDFs (4, 6 etc.). Once the core is ready, we can
> work on the tooling (7, 8, 9) and performance (10, 11).
>
> @Jörn: Yes, we should not have a tight dependency on Hive. It should be
> treated as one "connector" system out of many.
>
> Thanks,
> Timo
>
> [1]
>
> https://docs.google.com/document/d/1Yaxp1UJUFW-peGLt8EIidwKIZEWrrA-pznWLuvaH39Y/edit?ts=5bb62df4#
> [2] https://github.com/apache/flink/pull/6483
>
> Am 11.10.18 um 07:54 schrieb Jörn Franke:
> > Would it maybe make sense to provide Flink as an engine on Hive
> („flink-on-Hive“)? Eg to address 4,5,6,8,9,10. this could be more loosely
> coupled than integrating hive in all possible flink core modules and thus
> introducing a very tight dependency to Hive in the core.
> > 1,2,3 could be achieved via a connector based on the Flink Table API.
> > Just as a proposal to start this Endeavour as independent projects (hive
> engine, connector) to avoid too tight coupling with Flink. Maybe in a more
> distant future if the Hive integration is heavily demanded one could then
> integrate it more tightly if needed.
> >
> > What is meant by 11?
> >> Am 11.10.2018 um 05:01 schrieb Zhang, Xuefu :
> >>
> >> Hi Fabian/Vno,
> >>
> >> Thank you very much for your encouragement inquiry. Sorry that I didn't
> see Fabian's email until I read Vino's response just now. (Somehow Fabian's
> went to the spam folder.)
> >>
> >> My proposal contains long-term and short-terms goals. Nevertheless, the
> effort will focus on the following areas, including Fabian's list:
> >>
> >> 1. Hive metastore connectivity - This covers both read/write access,
> which means Flink can make full use of Hive's metastore as its catalog (at
> least for the batch but can extend for streaming as well).
> >> 2. Metadata compatibility - Objects (databases, tables, partitions,
> etc) created by Hive can be understood by Flink and the reverse direction
> is true also.
> >> 3. Data compatibility - Similar to #2, data produced by Hive can be
> consumed by Flink and vise versa.
> >> 4. Support Hive UDFs - For all Hive's native udfs, Flink either
> provides its own implementation or make Hive's implementation work in
> Flink. Further, for user created UDFs in Hive, Flink SQL should provide a
> mechanism allowing user to import them into Flink without any code change
> required.
> >> 5. Data types -  Flink SQL should support all data types that are
> available in Hive.
> >> 6. SQL Language - Flink SQL should support SQL standard (such as
> SQL2003) with extension 

Re: Does Flink SQL "in" operation has length limit?

2018-10-01 Thread Rong Rong
Hi Fabian. Yes I think that was what I missed. I haven't looked into the
code but just inferring from the translated plan pasted by Henry.

Let me try to take a look and put in a fix for this.

Thanks,
Rong

On Mon, Oct 1, 2018, 7:28 AM Fabian Hueske  wrote:

> Hi,
>
> I had a look into the code. From what I saw, we are translating the values
> into Rows.
> The problem here is that the IN clause is translated into a join and that
> the join results contains a time attribute field. This is a safety
> restriction to ensure that time attributes do not lose their watermark
> alignment because joins can return their results in random order. This
> should be related to or same as [1].
>
> Anyway, we should not translate IN clauses to joins for incrementally
> evaluated queries (aka. streaming queries).
> The main problem here is that the join materializes both inputs which is
> fine for the VALUES input but not for the "stream".
> I created FLINK-10474 to fix the problem.
>
> A workaround for the problem could be a user-defined scalar function that
> replaces the IN clause.
>
> Best, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-10211
> [2] https://issues.apache.org/jira/browse/FLINK-10474
>
> Am Mo., 1. Okt. 2018 um 10:01 Uhr schrieb Timo Walther  >:
>
>> Hi,
>>
>> tuple should not be used anywhere in flink-table. @Rong can you point us
>> to the corresponding code? I haven't looked into the code but we should
>> definitely support this query. @Henry feel free to open an issue for it.
>>
>> Regards,
>> Timo
>>
>>
>> Am 28.09.18 um 19:14 schrieb Rong Rong:
>>
>> Yes.
>>
>> Thanks for bringing this up Hequn! :-) I think Tuple would not be the
>> best container to use.
>>
>> However, in search for alternative, shouldn't Collection / List be a more
>> suitable solution? Row seems to not fit in the context (as there can be
>> Rows with elements of different type).
>> I vaguely recall there was similar JIRA but might not be related to IN
>> clause. Let me try to dig it up.
>>
>> --
>> Rong
>>
>> On Fri, Sep 28, 2018 at 9:32 AM Hequn Cheng  wrote:
>>
>>> Hi,
>>>
>>> I haven't look into the code. If this is limited by Tuple, would it
>>> better to implement it with Row?
>>>
>>> Best, Hequn
>>>
>>> On Fri, Sep 28, 2018 at 9:27 PM Rong Rong  wrote:
>>>
>>>> Hi Henry, Vino.
>>>>
>>>> I think IN operator was translated into either a RexSubQuery or a
>>>> SqlStdOperatorTable.IN operator.
>>>> I think Vino was referring to the first case.
>>>> For the second case (I think that's what you are facing here), they are
>>>> converted into tuples and the maximum we currently have in Flink was
>>>> Tuple25.java, I was wondering if that was the issue you are facing.
>>>> You can probably split the IN into many IN combining with OR.
>>>>
>>>> --
>>>> Rong
>>>>
>>>> On Fri, Sep 28, 2018 at 2:33 AM vino yang 
>>>> wrote:
>>>>
>>>>> Hi Henry,
>>>>>
>>>>> Maybe the number of elements in your IN clause is out of range? Its
>>>>> default value is 20, you can modify it with this configuration item:
>>>>>
>>>>> *withInSubQueryThreshold(XXX)*
>>>>>
>>>>> This API comes from Calcite.
>>>>>
>>>>> Thanks, vino.
>>>>>
>>>>> 徐涛  于2018年9月28日周五 下午4:23写道:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> When I am executing the following SQL in flink 1.6.1, some error 
>>>>>> throws out saying that it has a support issue, but when I reduce the 
>>>>>> number of integers in the “in” sentence, for example,
>>>>>>
>>>>>> trackId in (124427150,71648998) , Flink does not complain anything, 
>>>>>> so I wonder is there any length limit in “in” operation?
>>>>>>
>>>>>> Thanks a lot.
>>>>>>
>>>>>> SELECT
>>>>>> trackId as id,track_title as description, count(*) as cnt
>>>>>> FROM
>>>>>> play
>>>>>> WHERE
>>>>>> appName='play.statistics.trace' and
>>>>>> trackId in 
>>>>>> (124427150,71648998,124493327,524043,27300837,30300481,27300809,124744768,45982512,124526566,124556427,124804208,74302

Re: Does Flink SQL "in" operation has length limit?

2018-09-28 Thread Rong Rong
Yes.

Thanks for bringing this up Hequn! :-) I think Tuple would not be the best
container to use.

However, in search for alternative, shouldn't Collection / List be a more
suitable solution? Row seems to not fit in the context (as there can be
Rows with elements of different type).
I vaguely recall there was similar JIRA but might not be related to IN
clause. Let me try to dig it up.

--
Rong

On Fri, Sep 28, 2018 at 9:32 AM Hequn Cheng  wrote:

> Hi,
>
> I haven't look into the code. If this is limited by Tuple, would it better
> to implement it with Row?
>
> Best, Hequn
>
> On Fri, Sep 28, 2018 at 9:27 PM Rong Rong  wrote:
>
>> Hi Henry, Vino.
>>
>> I think IN operator was translated into either a RexSubQuery or a
>> SqlStdOperatorTable.IN operator.
>> I think Vino was referring to the first case.
>> For the second case (I think that's what you are facing here), they are
>> converted into tuples and the maximum we currently have in Flink was
>> Tuple25.java, I was wondering if that was the issue you are facing. You
>> can probably split the IN into many IN combining with OR.
>>
>> --
>> Rong
>>
>> On Fri, Sep 28, 2018 at 2:33 AM vino yang  wrote:
>>
>>> Hi Henry,
>>>
>>> Maybe the number of elements in your IN clause is out of range? Its
>>> default value is 20, you can modify it with this configuration item:
>>>
>>> *withInSubQueryThreshold(XXX)*
>>>
>>> This API comes from Calcite.
>>>
>>> Thanks, vino.
>>>
>>> 徐涛  于2018年9月28日周五 下午4:23写道:
>>>
>>>> Hi,
>>>>
>>>> When I am executing the following SQL in flink 1.6.1, some error 
>>>> throws out saying that it has a support issue, but when I reduce the 
>>>> number of integers in the “in” sentence, for example,
>>>>
>>>> trackId in (124427150,71648998) , Flink does not complain anything, so 
>>>> I wonder is there any length limit in “in” operation?
>>>>
>>>> Thanks a lot.
>>>>
>>>> SELECT
>>>> trackId as id,track_title as description, count(*) as cnt
>>>> FROM
>>>> play
>>>> WHERE
>>>> appName='play.statistics.trace' and
>>>> trackId in 
>>>> (124427150,71648998,124493327,524043,27300837,30300481,27300809,124744768,45982512,124526566,124556427,124804208,74302264,119588973,30496269,27300288,124098818,125071530,120918746,124171456,30413034,124888075,125270551,125434224,27300195,45982342,45982468,45982355,65349883,124705962,65349905,124298305,124889583,45982338,20506255,18556415,122161128,27299018,122850375,124862362,45982336,59613202,122991190,124590280,124867563,45982332,124515944,20506257,122572115,92083574)
>>>> GROUP BY
>>>> HOP(started_at_ts, INTERVAL '5' SECOND, INTERVAL '5' 
>>>> MINUTE),trackId,track_title;
>>>>
>>>>
>>>>
>>>> FlinkLogicalWindowAggregate(group=[{1, 2}], cnt=[COUNT()])
>>>>   FlinkLogicalCalc(expr#0..3=[{inputs}], started_at_ts=[$t2],
>>>> trackId=[$t0], track_title=[$t1])
>>>> FlinkLogicalJoin(condition=[=($0, $3)], joinType=[inner])
>>>>   FlinkLogicalCalc(expr#0..4=[{inputs}],
>>>> expr#5=[_UTF-16LE'play.statistics.trace'], expr#6=[=($t0, $t5)],
>>>> trackId=[$t1], track_title=[$t2], started_at_ts=[$t4], $condition=[$t6])
>>>> FlinkLogicalNativeTableScan(table=[[play]])
>>>>   FlinkLogicalValues(tuples=[[{ 124427150 }, { 71648998 }, {
>>>> 124493327 }, { 524043 }, { 27300837 }, { 30300481 }, { 27300809 }, {
>>>> 124744768 }, { 45982512 }, { 124526566 }, { 124556427 }, { 124804208 }, {
>>>> 74302264 }, { 119588973 }, { 30496269 }, { 27300288 }, { 124098818 }, {
>>>> 125071530 }, { 120918746 }, { 124171456 }, { 30413034 }, { 124888075 }, {
>>>> 125270551 }, { 125434224 }, { 27300195 }, { 45982342 }, { 45982468 }, {
>>>> 45982355 }, { 65349883 }, { 124705962 }, { 65349905 }, { 124298305 }, {
>>>> 124889583 }, { 45982338 }, { 20506255 }, { 18556415 }, { 122161128 }, {
>>>> 27299018 }, { 122850375 }, { 124862362 }, { 45982336 }, { 59613202 }, {
>>>> 122991190 }, { 124590280 }, { 124867563 }, { 45982332 }, { 124515944 }, {
>>>> 20506257 }, { 122572115 }, { 92083574 }]])
>>>>
>>>> This exception indicates that the query uses an unsupported SQL feature.
>>>> Please check the documentation for the set of currently supported SQL
>>>> features.
>>>&g

Re: Does Flink SQL "in" operation has length limit?

2018-09-28 Thread Rong Rong
Hi Henry, Vino.

I think IN operator was translated into either a RexSubQuery or a
SqlStdOperatorTable.IN operator.
I think Vino was referring to the first case.
For the second case (I think that's what you are facing here), they are
converted into tuples and the maximum we currently have in Flink was
Tuple25.java, I was wondering if that was the issue you are facing. You can
probably split the IN into many IN combining with OR.

--
Rong

On Fri, Sep 28, 2018 at 2:33 AM vino yang  wrote:

> Hi Henry,
>
> Maybe the number of elements in your IN clause is out of range? Its
> default value is 20, you can modify it with this configuration item:
>
> *withInSubQueryThreshold(XXX)*
>
> This API comes from Calcite.
>
> Thanks, vino.
>
> 徐涛  于2018年9月28日周五 下午4:23写道:
>
>> Hi,
>>
>> When I am executing the following SQL in flink 1.6.1, some error throws 
>> out saying that it has a support issue, but when I reduce the number of 
>> integers in the “in” sentence, for example,
>>
>> trackId in (124427150,71648998) , Flink does not complain anything, so I 
>> wonder is there any length limit in “in” operation?
>>
>> Thanks a lot.
>>
>> SELECT
>> trackId as id,track_title as description, count(*) as cnt
>> FROM
>> play
>> WHERE
>> appName='play.statistics.trace' and
>> trackId in 
>> (124427150,71648998,124493327,524043,27300837,30300481,27300809,124744768,45982512,124526566,124556427,124804208,74302264,119588973,30496269,27300288,124098818,125071530,120918746,124171456,30413034,124888075,125270551,125434224,27300195,45982342,45982468,45982355,65349883,124705962,65349905,124298305,124889583,45982338,20506255,18556415,122161128,27299018,122850375,124862362,45982336,59613202,122991190,124590280,124867563,45982332,124515944,20506257,122572115,92083574)
>> GROUP BY
>> HOP(started_at_ts, INTERVAL '5' SECOND, INTERVAL '5' 
>> MINUTE),trackId,track_title;
>>
>>
>>
>> FlinkLogicalWindowAggregate(group=[{1, 2}], cnt=[COUNT()])
>>   FlinkLogicalCalc(expr#0..3=[{inputs}], started_at_ts=[$t2],
>> trackId=[$t0], track_title=[$t1])
>> FlinkLogicalJoin(condition=[=($0, $3)], joinType=[inner])
>>   FlinkLogicalCalc(expr#0..4=[{inputs}],
>> expr#5=[_UTF-16LE'play.statistics.trace'], expr#6=[=($t0, $t5)],
>> trackId=[$t1], track_title=[$t2], started_at_ts=[$t4], $condition=[$t6])
>> FlinkLogicalNativeTableScan(table=[[play]])
>>   FlinkLogicalValues(tuples=[[{ 124427150 }, { 71648998 }, {
>> 124493327 }, { 524043 }, { 27300837 }, { 30300481 }, { 27300809 }, {
>> 124744768 }, { 45982512 }, { 124526566 }, { 124556427 }, { 124804208 }, {
>> 74302264 }, { 119588973 }, { 30496269 }, { 27300288 }, { 124098818 }, {
>> 125071530 }, { 120918746 }, { 124171456 }, { 30413034 }, { 124888075 }, {
>> 125270551 }, { 125434224 }, { 27300195 }, { 45982342 }, { 45982468 }, {
>> 45982355 }, { 65349883 }, { 124705962 }, { 65349905 }, { 124298305 }, {
>> 124889583 }, { 45982338 }, { 20506255 }, { 18556415 }, { 122161128 }, {
>> 27299018 }, { 122850375 }, { 124862362 }, { 45982336 }, { 59613202 }, {
>> 122991190 }, { 124590280 }, { 124867563 }, { 45982332 }, { 124515944 }, {
>> 20506257 }, { 122572115 }, { 92083574 }]])
>>
>> This exception indicates that the query uses an unsupported SQL feature.
>> Please check the documentation for the set of currently supported SQL
>> features.
>> at
>> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:275)
>> at
>> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:845)
>> at
>> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:892)
>> at
>> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:344)
>> at
>> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:786)
>> at
>> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:723)
>> at
>> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:683)
>> at
>> com.ximalaya.flink.dsl.application.FlinkApplication$$anonfun$main$5.apply(FlinkApplication.scala:141)
>> at
>> com.ximalaya.flink.dsl.application.FlinkApplication$$anonfun$main$5.apply(FlinkApplication.scala:139)
>> at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>> at
>> com.ximalaya.flink.dsl.application.FlinkApplication$.main(FlinkApplication.scala:139)
>> at
>> com.ximalaya.flink.dsl.web.test.DslTestUtils$.executeDslFile(DslTestUtils.scala:69)
>> at
>> com.ximalaya.flink.dsl.web.test.PlayCountTest$.main(PlayCountTest.scala:5)
>> at com.ximalaya.flink.dsl.web.test.PlayCountTest.main(PlayCountTest.scala)
>>
>> Best
>> Henry
>>
>


Re: How to get the location of keytab when using flink on yarn

2018-09-24 Thread Rong Rong
Hi

Just a quick thought on this:
You might be able to use delegation token to access HBase[1]. It might be a
more secure way instead of distributing your keytab over to all the YARN
nodes.

Hope this helps.

--
Rong

[1] https://wiki.apache.org/hadoop/Hbase/HBaseTokenAuthentication

On Mon, Sep 24, 2018 at 7:51 PM sanmutongzi  wrote:

> Hi Aljoscha,
> Sorry for my late response . According to my experience , if the
> flink-conf.yaml has set the "security.kerberos.login.keytab" and
> "security.kerberos.login.contexts" with a kerberos file then yarn will
> ship the keytab file to the TaskManager .
> Also i can find the log like:
>  " INFO  org.apache.flink.configuration.GlobalConfiguration-
> Loading configuration property: security.kerberos.login.keytab,
>
> /data1/yarn/nm/usercache/hadoop/appcache/application_1537515506704_0007/container_e28_1537515506704_0007_01_01/krb5.keytab"
> in the TaskManager log.
> My problem is that in the user code like map or sink function how can i get
> the security.kerberos.login.keytab value  for login .
>
> THANKS
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Accessing Global State when processing KeyedStreams

2018-09-19 Thread Rong Rong
Hi Scott,

Your use case seems to be a perfect fit for the Broadcast state pattern
[1].

--
Rong

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/broadcast_state.html


On Wed, Sep 19, 2018 at 7:11 AM Scott Sue  wrote:

> Hi,
>
> In our application, we receive Orders and Prices via two KafkaSources.
> What
> I want to do is to perform calculations for a given Order against a stream
> of Prices for the same securityId, i.e. same identifier between the Order
> and stream of Prices.  Naturally this is a perfect fit for a KeyedStream
> against the securityId for both Orders and Prices.
>
> I have currently connected these two streams together and then processing
> by
> ordersKeyStream.connect(pricesKeyStream).process(MyCoProcessFunction) and
> this fits for 95% of the time.  However part of my requirement is for
> certain Orders, I need to be able to connect prices from a different
> securityId (aka different key) to perform more calculations.  From what I
> can see, by the time I get to my CoProcessFunction, I am only able to see
> the Orders and Prices for a single securityId, I won't be able to cross
> over
> to another KeyedStream of Prices for me to perform this extra calculation.
> In terms of this extra calculation, it is not a hard requirement to be able
> to cross over to another KeyedStream of Prices, this is more ideal.
>
> Things that I have thought about to get around this as it would be
> acceptable to have a slightly older price for the securityId I require so:
> 1) I could connect to an external source of information to get this Price,
> or
> 2) Periodically broadcast out a price that the ProcessFunction could
> consume
> to perform this extra calculation.
>
> This seems like something Flink should be easily able to handle, I just
> feel
> as though I'm missing something here to allow this.
>
> Just as something as a more non functional requirement.  The number of
> prices I receive per second can reach 10's of 000's per second, so that is
> also something that I am very wary of as well
>
> Is there anything that could be suggested to help me out on this?
>
>
> Thanks in advance!
> Scott
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Question about Window Tigger

2018-09-19 Thread Rong Rong
Hi Chang,

There were some previous discussion regarding how to debug watermark and
window triggers[1].
Basically if there's no data for some partitions there's no way to advance
watermark. As it would not be able to determine whether this is due to
network failure or actually there's no data arriving at the source.
I think your use case is better of using SlidingProcessingTimeWindow.

Thanks,
Rong

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/watermark-does-not-progress-td22315.html

On Wed, Sep 19, 2018 at 1:48 AM Chang Liu  wrote:

> Dear All,
>
> I have a question about the Window Trigger: let’s say i would like like
> use the SlidingEventTimeWindow (60 seconds window size + 1 second window
> shift) to count the number of records per window. And I am using Event Time
> with periodic watermarking with certain maxOurOfOrderness time.
>
> Sometimes, what happens is: during certain time, there is no incoming
> event, and then the watermark for triggering the window fire is not
> coming.  Then, the last several records will be just stayed in the window.
> It will fire only when the window sees the watermark to trigger.
>
> What I would like to achieve is: if there is no just watermark coming
> within certain time (maybe this time is system clock time?), I can still
> trigger the window to fire no matter whether there is new event coming or
> not. Then I can still get the window count for this window, without waiting
> the next event, which could be coming after a long time.
>
> Do you have any idea how can I do this? Many Thanks :)
>
> Best regards/祝好,
>
> Chang Liu 刘畅
>
>
>


Re: why same Sliding ProcessTime TimeWindow triggered twice

2018-09-17 Thread Rong Rong
I haven't dug too deep into the content. But seems like this line was the
reason:

.keyBy(s => s.endsWith("FRI"))

essentially you are creating two key partitions (True, False) where each
one of them has its own sliding window I believe. Can you printout the key
space for each of the window?

--
Rong

On Sun, Sep 16, 2018 at 1:23 AM 远远  wrote:

>
>
> -- Forwarded message -
> From: 远远 
> Date: 2018年9月16日周日 下午4:08
> Subject: Re: why same Sliding ProcessTime TimeWindow triggered twice
> To: 
>
>
> hi, the flink version that i test  is 1.4.2, and i just run test code with
> local env in IDEA, and all the setting in the test code.
> my os is deepin(linux based debian) 15.7...
>
> and i try again, the print as flow:
> now   ===> 2018-09-16 16:06:09
> start ===> 2018-09-16 16:05:10
> end   ===> 2018-09-16 16:06:10
> max   ===> 2018-09-16 16:06:09
> TimeWindow{start=153708511, end=153708517}
> aggreation
> now   ===> 2018-09-16 16:06:09
> start ===> 2018-09-16 16:05:10
> end   ===> 2018-09-16 16:06:10
> max   ===> 2018-09-16 16:06:09
> TimeWindow{start=153708511, end=153708517}
> aggreation
> now   ===> 2018-09-16 16:06:16
> start ===> 2018-09-16 16:05:15
> end   ===> 2018-09-16 16:06:15
> max   ===> 2018-09-16 16:06:14
> TimeWindow{start=1537085115000, end=1537085175000}
> aggreation
> now   ===> 2018-09-16 16:06:19
> start ===> 2018-09-16 16:05:20
> end   ===> 2018-09-16 16:06:20
> max   ===> 2018-09-16 16:06:19
> TimeWindow{start=153708512, end=153708518}
> aggreation
> now   ===> 2018-09-16 16:06:20
> start ===> 2018-09-16 16:05:20
> end   ===> 2018-09-16 16:06:20
> max   ===> 2018-09-16 16:06:19
> TimeWindow{start=153708512, end=153708518}
> aggreation
> now   ===> 2018-09-16 16:06:24
> start ===> 2018-09-16 16:05:25
> end   ===> 2018-09-16 16:06:25
> max   ===> 2018-09-16 16:06:24
> TimeWindow{start=1537085125000, end=1537085185000}
> aggreation
> now   ===> 2018-09-16 16:06:24
> start ===> 2018-09-16 16:05:25
> end   ===> 2018-09-16 16:06:25
> max   ===> 2018-09-16 16:06:24
> TimeWindow{start=1537085125000, end=1537085185000}
> aggreation
> now   ===> 2018-09-16 16:06:25
> start ===> 2018-09-16 16:05:25
> end   ===> 2018-09-16 16:06:25
> max   ===> 2018-09-16 16:06:24
> TimeWindow{start=1537085125000, end=1537085185000}
> aggreation
> now   ===> 2018-09-16 16:06:29
> start ===> 2018-09-16 16:05:30
> end   ===> 2018-09-16 16:06:30
> max   ===> 2018-09-16 16:06:29
> TimeWindow{start=153708513, end=153708519}
> aggreation
> now   ===> 2018-09-16 16:06:29
> start ===> 2018-09-16 16:05:30
> end   ===> 2018-09-16 16:06:30
> max   ===> 2018-09-16 16:06:29
> TimeWindow{start=153708513, end=153708519}
> aggreation
> now   ===> 2018-09-16 16:06:30
> start ===> 2018-09-16 16:05:30
> end   ===> 2018-09-16 16:06:30
> max   ===> 2018-09-16 16:06:29
> TimeWindow{start=153708513, end=153708519}
> aggreation
> now   ===> 2018-09-16 16:06:36
> start ===> 2018-09-16 16:05:35
> end   ===> 2018-09-16 16:06:35
> max   ===> 2018-09-16 16:06:34
> TimeWindow{start=1537085135000, end=1537085195000}
>
>
> Xingcan Cui  于2018年9月16日周日 下午3:55写道:
>
>> Hi,
>>
>> I’ve tested your code in my local environment and everything worked fine.
>> It’s a little weird to see your output like that. I wonder if you could
>> give more information about your environment, e.g., your flink version and
>> execution settings.
>>
>> Thanks,
>> Xingcan
>>
>> On Sep 16, 2018, at 3:19 PM, 远远  wrote:
>>
>> hi,everyone:
>> today, i test Sliding ProcessTime TimeWindow with print some merties. i
>> find a same sliding window be printed twice, as fllower:
>>
>> now   ===> 2018-09-16 15:11:44
>> start ===> 2018-09-16 15:10:45
>> end   ===> 2018-09-16 15:11:45
>> max   ===> 2018-09-16 15:11:44
>> TimeWindow{start=1537081845000, end=1537081905000}
>> aggreation
>> now   ===> 2018-09-16 15:11:45
>> start ===> 2018-09-16 15:10:45
>> end   ===> 2018-09-16 15:11:45
>> max   ===> 2018-09-16 15:11:44
>> TimeWindow{start=1537081845000, end=1537081905000}
>> aggreation
>>
>> but when i do some sum operator,it will not, i want to know why?
>> thanks.
>>
>> my test code is:
>>
>> object SlidingProcessTimeWindowTest {
>>
>> def main(args: Array[String]): Unit = {
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> env.addSource((context: SourceContext[String]) => {while(true) 
>> context.collect(new Random().nextInt(100) + ":FRI")})
>> .keyBy(s => s.endsWith("FRI"))
>> .timeWindow(Time.minutes(1), Time.seconds(5))
>> .apply((e, w, iter, coll: Collector[String]) => {
>> println("now   ===> " + 
>> convert(DateTime.now().getMillis))
>> println("start ===> " + convert(w.getStart))
>> println("end   ===> " + convert(w.getEnd))
>> println("max   ===> " + convert(w.maxTimestamp()))
>> println(w)
>> //

Re: Potential bug in Flink SQL HOP and TUMBLE operators

2018-09-17 Thread Rong Rong
This is in fact a very strange behavior.

To add to the discussion, when you mentioned: "raw Flink (windowed or not)
nor when using Flink CEP", how were the comparisons being done?
Also, were you able to get the results correct without the additional GROUP
BY term of "foo" or "userId"?

--
Rong

On Mon, Sep 17, 2018 at 12:30 PM Fabian Hueske  wrote:

> Hmm, that's interesting.
> HOP and TUMBLE window aggregations are directly translated into their
> corresponding DataStream counterparts (Sliding, Tumble).
> There should be no filtering of records.
>
> I assume you tried a simple query like "SELECT * FROM MyEventTable" and
> received all expected data?
>
> Fabian
>
> 2018-09-17 18:56 GMT+02:00 elliotst...@gmail.com :
>
>> Yes, I am certain events are being ignored or dropped during the first
>> five seconds.  Further investigation on my part reveals that the "ignore"
>> period is exactly the first five seconds of the stream - regardless of the
>> size of the window.
>>
>> Situation
>>
>> I have a script which pushes an event into Kafka once every second
>> structured as:
>>
>> {"userId": "use...@email.com", "timestamp": > producer>}
>>
>> My stream uses this Kafka queue as its source.  JSON schema and table
>> schema are as follows:
>>
>> final Json jsonFormat = new Json()
>> .failOnMissingField(false)
>> .jsonSchema("{"
>> + "  type: 'object',"
>> + "  properties: {"
>> + "userId: { type: 'string' },"
>> + "timestamp: { type: 'integer' }"
>> + "  }"
>> + "}");
>>
>> final Schema tableSchema = new Schema()
>> .field("userId", Types.STRING())
>> .field("timestamp", TypeInformation.of(BigDecimal.class))
>> .field("proctime", Types.SQL_TIMESTAMP())
>> .proctime();
>>
>> StreamTableEnvironment is configured to be in append mode, and table
>> source is named "MyEventTable".  The stream is using the following SQL
>> query:
>>
>> final String sql =
>> " SELECT userId, `timestamp` "
>> + " FROM MyEventTable "
>> + " GROUP BY HOP(proctime, INTERVAL '1' SECOND, INTERVAL '10'
>> SECOND), userId, `timestamp` ";
>> final Table resultTable = tableEnvironment.sqlQuery(sql);
>>
>> Code which I'm using to verify that events are being dropped:
>>
>> streamTableEnvironment.toAppendStream(sqlResultTable, Row.class)
>> .map((MapFunction) row -> {
>>   final String userId = row.getField(0).toString();
>>   final BigDecimal timestamp = (BigDecimal) row.getField(1);
>>
>>   return String.format(
>>   "(%s, %s)",
>>   userId, timestamp.toString()
>>   );
>> })
>> .print();
>>
>>
>> No events produced during the first five seconds following a cold start
>> of Flink are ever printed to the console.  Any and all events produced
>> after the first five seconds following a cold start of Flink are always
>> printed to the console.  All processes are running on the same system.
>>
>> This issue does not occur when using raw Flink (windowed or not) nor when
>> using Flink CEP.  Again, have not tried Table API.
>>
>>
>


Re: ListState - elements order

2018-09-13 Thread Rong Rong
I don't think ordering is guaranteed in the internal implementation, to the
best of my knowledge.
I agreed with Aljoscha, if there is no clear definition of ordering, it is
assumed to be not preserved by default.

--
Rong

On Thu, Sep 13, 2018 at 7:30 PM vino yang  wrote:

> Hi Aljoscha,
>
> Regarding window merging, as you said, it's not clear, because Flink does
> some internal work.
> But if it's just for the user, isn't it clear without any internal
> operations? I think if the user explicitly uses it, it should conform to
> the basic List semantics. Otherwise why define it instead of using
> SetListState directly?
>
> Thanks, vino.
>
> Aljoscha Krettek  于2018年9月13日周四 下午10:42写道:
>
>> Hi,
>>
>> this is not clearly defined anywhere, and I was always working under the
>> assumption that the order is not preserved. This potentially allows more
>> optimizations by the system, and for example in case of merging windows we
>> don't know the order of elements in a ListState after a merge.
>>
>> Best,
>> Aljoscha
>>
>> On 6. Sep 2018, at 08:19, vino yang  wrote:
>>
>> Hi Alexey,
>>
>> The answer is Yes, which preserves the semantics of the List's order of
>> elements.
>>
>> Thank, vino.
>>
>> Alexey Trenikhun  于2018年9月6日周四 上午10:55写道:
>>
>>> Hello,
>>> Does keyed managed ListState preserve elements order, for example if
>>> I call listState.add(e1); listState.add(e2); listState.add(e3); , does
>>> ListState guarantee that listState.get() will return elements in order
>>> they were added (e1, e2, e3)
>>>
>>> Alexey
>>>
>>>
>>>
>>


  1   2   >