Re: Auto Scaling in Flink

2019-12-03 Thread vino yang
Hi Akash,

The key difference between Pravega and Kafka is: Kafka is a messaging
system, while Pravega is a streaming system.[1] The official documentation
also statements their difference in their faq page.[2]

[1]:
https://siliconangle.com/2017/04/17/dell-emc-takes-on-streaming-storage-with-open-source-solution-pravega-ffsf17/
[2]: http://www.pravega.io/faq.html

Best,
Vino

Akash Goel  于2019年12月4日周三 下午12:00写道:

> Hi,
>
> If my application is already running on Kafka, then do I need to replace
> with Pravega or can Pravega read directly from Kafka?
>
> I have also reached out to to Pravega Community but just checking here.
>
> Thanks,
> Akash Goel
>
> On Fri, Nov 29, 2019 at 11:14 AM Caizhi Weng  wrote:
>
>> Hi Akash,
>>
>> Flink doesn't support auto scaling in core currently, it may be supported
>> in the future, when the new scheduling architecture is implemented
>> https://issues.apache.org/jira/browse/FLINK-10407 .
>>
>> You can do it externally by cancel the job with a savepoint, update the
>> parallelism, and restart the job, according to the rate of data. like what
>> pravega suggests in the doc:
>> http://pravega.io/docs/latest/key-features/#auto-scaling.
>>
>> vino yang  于2019年11月29日周五 上午11:12写道:
>>
>>> Hi Akash,
>>>
>>> You can use Pravega connector to integrate with Flink, the source code
>>> is here[1].
>>>
>>> In short, relying on its rescalable state feature[2] flink supports
>>> scalable streaming jobs.
>>>
>>> Currently, the mainstream solution about auto-scaling is Flink + K8S, I
>>> can share some resources with you[3].
>>>
>>> Best,
>>> Vino
>>>
>>> [1]: https://github.com/pravega/flink-connectors
>>> [2]:
>>> https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
>>> [3]:
>>> https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2019-scaling-a-realtime-streaming-warehouse-with-apache-flink-parquet-and-kubernetes-aditi-verma-ramesh-shanmugam
>>>
>>> Akash Goel  于2019年11月29日周五 上午9:52写道:
>>>
 Hi,

 Does Flunk support auto scaling. I read that it is supported using
 pravega? Is it incorporated in any version.

 Thanks,
 Akash Goel

>>>


Flink authentication hbase use kerberos

2019-12-03 Thread venn
Hi Guys:

I wonder about, it is work that flink on yarn deploy on no
authentication Hadoop cluster, access hbase deploy on Kerberos
authentication Hadoop cluster? If work, what I need to do. I already config
flink-conf-yaml properties "security.kerberos.login.keytab" and
"security.kerberos.login.principal".

 

 

And i found the next paragraph in flink official website :
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/security-ker
beros.html  

 

Hadoop Security Module

This module uses the Hadoop UserGroupInformation (UGI) class to establish a
process-wide login user context. The login user is then used for all
interactions with Hadoop, including HDFS, HBase, and YARN.

If Hadoop security is enabled (in core-site.xml), the login user will have
whatever Kerberos credential is configured. Otherwise, the login user
conveys only the user identity of the OS account that launched the cluster.

 



 

Thanks a lot !

 

 



Re: Building with Hadoop 3

2019-12-03 Thread vino yang
cc @Chesnay Schepler  to answer this question.

Foster, Craig  于2019年12月4日周三 上午1:22写道:

> Hi:
>
> I don’t see a JIRA for Hadoop 3 support. I see a comment on a JIRA here
> from a year ago that no one is looking into Hadoop 3 support [1]. Is there
> a document or JIRA that now exists which would point to what needs to be
> done to support Hadoop 3? Right now builds with Hadoop 3 don’t work
> obviously because there’s no flink-shaded-hadoop-3 artifacts.
>
>
>
> Thanks!
>
> Craig
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-11086
>
>
>


Re: [DISCUSS] Drop RequiredParameters and OptionType

2019-12-03 Thread vino yang
+1,

One concern: these two classes are marked with `@publicEvolving`
annotation.
Shall we mark them with `@Deprecated` annotation firstly?

Best,
Vino

Dian Fu  于2019年12月3日周二 下午8:56写道:

> +1 to remove them. It seems that we should also drop the class Option as
> it's currently only used in RequiredParameters.
>
> 在 2019年12月3日,下午8:34,Robert Metzger  写道:
>
> +1 on removing it.
>
> On Tue, Dec 3, 2019 at 12:31 PM Stephan Ewen  wrote:
>
>> I just stumbled across these classes recently and was looking for sample
>> uses.
>> No examples and other tests in the code base seem to
>> use RequiredParameters and OptionType.
>>
>> They also seem quite redundant with how ParameterTool itself works
>> (tool.getRequired()).
>>
>> Should we drop them, in an attempt to reduce unnecessary code and
>> confusion for users (multiple ways to do the same thing)? There are also
>> many better command line parsing libraries out there, this seems like
>> something we don't need to solve in Flink.
>>
>> Best,
>> Stephan
>>
>
>


How to explain the latency at different source injection rate?

2019-12-03 Thread Wang, Rui2
Hi there,

I have got confused by the issue about the end-2-end latency statics. When I 
run benchmark(similar to yahoo streaming benchmark, yet without Redis, data 
generated by Flink, sink to Kafka),
key parameters set as below:
  Task manager(short for tm) number:  3,
  slots per tm:  3,
  parallel:  9,
  buffer. Timeout:  100ms

the plan as below:
[cid:image003.jpg@01D5AAAB.AC77F590]

When I set injection rate to be 10k, the latency measured is 108ms, but when 
set injection rate to be 1k, latency is 560ms.
[cid:image004.jpg@01D5AAAB.AC77F590]

IMO, the low injection rate may get low latency, but the result could match. 
Some body could give me the explain, thanks.

Best Regards & Thanks

Rui



Re: [ANNOUNCE] Weekly Community Update 2019/48

2019-12-03 Thread Dongwon Kim
Dear community,

As Konstantin mentioned, there will be the second Apache Flink meetup in
Seoul [1] and we just finalized the list of speakers. Here I share the talk
lineup with brief introduction.

1. "Mastering Flink's interval join" by Jihye Yeom (Kakao corp.)
: Kakao corp. recently adopts Apache Flink for processing marketing and
advertising data. In this talk, Jihye shares how she becomes able to make
full use of Flink's interval join over high volume of real-time data.

2. "Adopting Apache Flink to T map service" by Seungcheol Oh (T map, SK
telecom)
: This talk covers T map team's journey adopting and operating a real-time
driving score service [2] which is a mission-critical application written
in Flink DataStream API.

3. "Do Flink On Web with FLOW" by Dongwon Kim (Data Labs, SK telecom)
: Dongwon introduces a web service to build data stream processing
pipelines on GUI based on Flink Table API. It was presented at FlinkForward
2019 Europe [3].

4. "From source to sink", Hangu Kang (Kakao mobility)
: Hangu shares his experience of building and operating Flink applications
for various mobility services in Kakao mobility. This talk covers Kafka
connectors, event time processing, checkpoint/savepoint, state management,
custom windowing, etc.

[1] https://www.meetup.com/ko-KR/Seoul-Apache-Flink-Meetup/events/266824815/
[2] https://www.youtube.com/watch?v=wPQWFy5JENw
[3] https://www.youtube.com/watch?v=dxMn7cIqxG4

Best,

Dongwon

On Mon, Dec 2, 2019 at 1:47 AM Konstantin Knauf 
wrote:

> Dear community,
>
> happy to share a short community update this week. With one week to go to
> the planned feature freeze for Flink 1.10 and Flink Forward Asia in Beijing
> the dev@ mailing list pretty quiet these days.
>
> Flink Development
> ==
>
> * [releases] Hequn has started the vote on RC1 for Flink 1.8.3, which
> unfortunately has already received a -1 due to wrong/missing license
> information. Expecting a new RC soon. [1]
>
> * [sql] In the past timestamp fields in Flink SQL were internally
> represented as longs and it was recommended to use longs directly in
> user-defined functions. With the introduction of a new TimestampType the
> situation has changed and conversion between long and TIMESTAMP will be
> disabled. [2]
>
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-1-8-3-release-candidate-1-tp35401p35407.html
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Disable-conversion-between-TIMESTAMP-and-Long-in-parameters-and-results-of-UDXs-tp35269p35271.html
>
> Notable Bugs
> ==
>
> * [FLINK-14930] [1.9.1] The OSS filesystem did not allow the configuration
> of additional credentials providers due to a shading-related bug. Resolved
> for 1.9.2. [3]
>
> [3] https://issues.apache.org/jira/browse/FLINK-14930
>
> Events, Blog Posts, Misc
> ===
>
> * Flink Forward Asia took place this week at the National Congress Center
> in Beijing organized by Alibaba. Talks by Ververica, Tencent, Baidu,
> Alibaba, Dell, Lyft, Netflix, Cloudera and many other members of the
> Chinese Apache Flink community, and more than 1500 attendees as far as I
> heard. Impressive! [4]
>
> * At Flink Forward Asia Alibaba announced it has open sourced Alink, a
> machine learning library on top of Apache Flink[5,6]
>
> * Upcoming Meetups
> * The first meetup of the Apache Flink Meetup Chicago on 5th of
> December comes with four talks(!) highlighting different deployment methods
> of Apache Flink (AWS EMR, AWS Kinesis Analytics, Verveirca Platform, IBM
> Kubernetes). Talks by *Trevor Grant*, *Seth Wiesman*, *Joe Olson* and 
> *Margarita
> Uk*. [7]
> * On December 17th there will be the second Apache Flink meetup in
> Seoul. Maybe Dongwon can share the list of speakers in this thread, my
> Korean is a bit rusty. [8]
>
> [4] https://m.aliyun.com/markets/aliyun/developer/ffa2019
> [5]
> https://technode.com/2019/11/28/alibaba-cloud-machine-learning-platform-open-source/
> [6] https://github.com/alibaba/Alink/blob/master/README.en-US.md
> [7]
> https://www.meetup.com/Chicago-Apache-Flink-Meetup-CHAF/events/266609828/
> [8] https://www.meetup.com/Seoul-Apache-Flink-Meetup/events/266824815/
>
> Cheers,
>
> Konstantin (@snntrable)
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
>
> Follow us @VervericaData Ververica 
>
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Tony) Cheng
>


Re: Flink 'Job Cluster' mode Ui Access

2019-12-03 Thread Jatin Banger
Hi,

I am using flink binary directly.

I am using this command to deploy the script.

"$FLINK_HOME/bin/standalone-job.sh"
start-foreground --job-classname ${ARGS_FOR_JOB}
where ARGS_FOR_JOB contain job class name and all other necessary details
needed by the job.

Best regards,
Jatin


On Fri, Nov 29, 2019 at 4:18 PM Chesnay Schepler  wrote:

> To clarify, you ran "mvn package -pl flink-dist -am" to build Fink?
>
> If so, could you run that again and provide us with the maven output?
>
> On 29/11/2019 11:23, Jatin Banger wrote:
>
> Hi,
>
> @vino yang   I am using flink 1.8.1
>
> I am using the following procedure for the deployment:
>
> https://github.com/apache/flink/blob/master/flink-container/docker/README.md
>
> And i tried accessing the path you mentioned:
>
> # curl :4081/#/overview
> {"errors":["Not found."]}
>
> Best Regards,
> Jatin
>
> On Thu, Nov 28, 2019 at 10:21 PM Chesnay Schepler 
> wrote:
>
>> Could you try accessing :/#/overview ?
>>
>> The REST API is obviously accessible, and hence the WebUI should be too.
>>
>> How did you setup the session cluster? Are you using some custom Flink
>> build or something, which potentially excluded flink-runtime-web from the
>> classpath?
>>
>> On 28/11/2019 10:02, Jatin Banger wrote:
>>
>> Hi,
>>
>> I checked the log file there is no error.
>> And I checked the pods internal ports by using rest api.
>>
>> # curl : 4081
>> {"errors":["Not found."]}
>> 4081 is the Ui port
>>
>> # curl :4081/config
>> {"refresh-interval":3000,"timezone-name":"Coordinated Universal
>> Time","timezone-offset":0,"flink-version":"","flink-revision":"ceba8af
>> @ 11.02.2019 @ 22:17:09 CST"}
>>
>> # curl :4081/jobs
>> {"jobs":[{"id":"___job_Id_","status":"RUNNING"}]}
>>
>> Which shows the state of the job as running.
>>
>> What else can we do ?
>>
>> Best regards,
>> Jatin
>>
>> On Thu, Nov 28, 2019 at 1:28 PM vino yang  wrote:
>>
>>> Hi Jatin,
>>>
>>> Flink web UI does not depend on any deployment mode.
>>>
>>> You should check if there are error logs in the log file and the job
>>> status is running state.
>>>
>>> Best,
>>> Vino
>>>
>>> Jatin Banger  于2019年11月28日周四 下午3:43写道:
>>>
 Hi,

 It seems there is Web Ui for Flink Session cluster, But for Flink Job
 Cluster it is Showing

 {"errors":["Not found."]}

 Is it the expected behavior for Flink Job Cluster Mode ?

 Best Regards,
 Jatin

>>>
>>
>


Re: Auto Scaling in Flink

2019-12-03 Thread Akash Goel
Hi,

If my application is already running on Kafka, then do I need to replace
with Pravega or can Pravega read directly from Kafka?

I have also reached out to to Pravega Community but just checking here.

Thanks,
Akash Goel

On Fri, Nov 29, 2019 at 11:14 AM Caizhi Weng  wrote:

> Hi Akash,
>
> Flink doesn't support auto scaling in core currently, it may be supported
> in the future, when the new scheduling architecture is implemented
> https://issues.apache.org/jira/browse/FLINK-10407 .
>
> You can do it externally by cancel the job with a savepoint, update the
> parallelism, and restart the job, according to the rate of data. like what
> pravega suggests in the doc:
> http://pravega.io/docs/latest/key-features/#auto-scaling.
>
> vino yang  于2019年11月29日周五 上午11:12写道:
>
>> Hi Akash,
>>
>> You can use Pravega connector to integrate with Flink, the source code is
>> here[1].
>>
>> In short, relying on its rescalable state feature[2] flink supports
>> scalable streaming jobs.
>>
>> Currently, the mainstream solution about auto-scaling is Flink + K8S, I
>> can share some resources with you[3].
>>
>> Best,
>> Vino
>>
>> [1]: https://github.com/pravega/flink-connectors
>> [2]:
>> https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html
>> [3]:
>> https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2019-scaling-a-realtime-streaming-warehouse-with-apache-flink-parquet-and-kubernetes-aditi-verma-ramesh-shanmugam
>>
>> Akash Goel  于2019年11月29日周五 上午9:52写道:
>>
>>> Hi,
>>>
>>> Does Flunk support auto scaling. I read that it is supported using
>>> pravega? Is it incorporated in any version.
>>>
>>> Thanks,
>>> Akash Goel
>>>
>>


Re: Add time attribute column to POJO stream

2019-12-03 Thread Jingsong Lee
Hi Chris,

First thing, FxRate is not POJO, a POJO should have a constructor without
arguments. In this way, you can read from a POJO DataStream directly.

Second, if you want get field from POJO, please use get function
like: fx.get('currency'), if you have a POJO field, you can use this way to
get nested field from POJO.

Best,
Jingsong Lee


On Wed, Dec 4, 2019 at 12:33 AM Chris Miller  wrote:

> I'm having trouble dealing with a DataStream of POJOs. In particular, when
> I perform SQL operations on it I can't figure out the syntax for referring
> to individual fields within the POJO.
>
> Below is an example that illustrates the problem and the various
> approaches I've tried. Can anyone please point me in the right direction?
>
> import java.util.Arrays;
> import java.util.List;
>
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
>
> public class PojoTest {
>   public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment streamEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(streamEnv);
>
> // Using tuples, this works as expected
> List> tupleData = Arrays.asList(
> new Tuple2<>("USD", 1.0),
> new Tuple2<>("GBP", 1.3),
> new Tuple2<>("EUR", 1.11));
> DataStreamSource> tupleStream = 
> streamEnv.fromCollection(tupleData);
> tableEnv.fromDataStream(tupleStream, "currency, rate").printSchema();
>
> // Using a DataStream of POJOs, how do I obtain an equivalent table to 
> the above?
> List pojoData = Arrays.asList(
> new FxRate("USD", 1.0),
> new FxRate("GBP", 1.3),
> new FxRate("EUR", 1.11));
> DataStreamSource pojoStream = streamEnv.fromCollection(pojoData);
>
> Table pojoTable = tableEnv.fromDataStream(pojoStream, "fx");
> pojoTable.printSchema();
>
> // This fails with "ValidationException: Cannot resolve field [currency], 
> input field list:[fx]"
> pojoTable.select("currency, rate").printSchema();
>
> // This fails with "ValidationException: Undefined function: currency"
> pojoTable.select("fx.currency AS currency, fx.rate AS 
> rate").printSchema();
>
> // This fails with "ValidationException: Too many fields referenced from 
> an atomic type"
> tableEnv.fromDataStream(pojoStream, "currency, rate").printSchema();
>
> // This fails with "ValidationException: Field reference expression 
> expected"
> tableEnv.fromDataStream(pojoStream, "fx.currency, fx.rate").printSchema();
>
> streamEnv.execute();
>   }
>
>   public static class FxRate {
> public String currency;
> public double rate;
>
> public FxRate(String currency, double rate) {
>   this.currency = currency;
>   this.rate = rate;
> }
>
> @Override
> public String toString() {
>   return "FxRate{currency='" + currency + '\'' + ", rate=" + rate + '}';
> }
>   }
> }
>
>

-- 
Best, Jingsong Lee


Re: A problem of open in AggregateFunction

2019-12-03 Thread Jingsong Li
Hi Guobao,

Looks like this is from table/SQL API.
You can override public void open(FunctionContext context)
It should work, can you provide more information? Like:
- version
- which planner
- what problem, open method never being invoked?

Best,
Jingsong Lee

On Wed, Dec 4, 2019 at 11:09 AM Biao Liu  wrote:

> Hi Guobao,
>
> Are you using table API? I'm not familiar with table API, but for data
> stream API, generally speaking user could do some initialization through
> "open" method of "Rich" function, like "RichAggregateFunction".
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Tue, 3 Dec 2019 at 22:44, Guobao Li  wrote:
>
>> Hi community,
>>
>>
>>
>> I am trying to register a metric in an aggregate UDF by overriding the
>> *open* function. According to the documentation, the *open* function can
>> be override in order to retrieve the metric group to do the metric
>> registration. But it works only on ScalarFunction not on AggregateFunction.
>> Since the *open* function is not invoked by AggregateFunction. Could
>> anyone help me out of it?
>>
>>
>>
>> Thanks,
>>
>> Guobao
>>
>>
>>
>

-- 
Best, Jingsong Lee


Re: A problem of open in AggregateFunction

2019-12-03 Thread Biao Liu
Hi Guobao,

Are you using table API? I'm not familiar with table API, but for data
stream API, generally speaking user could do some initialization through
"open" method of "Rich" function, like "RichAggregateFunction".

Thanks,
Biao /'bɪ.aʊ/



On Tue, 3 Dec 2019 at 22:44, Guobao Li  wrote:

> Hi community,
>
>
>
> I am trying to register a metric in an aggregate UDF by overriding the
> *open* function. According to the documentation, the *open* function can
> be override in order to retrieve the metric group to do the metric
> registration. But it works only on ScalarFunction not on AggregateFunction.
> Since the *open* function is not invoked by AggregateFunction. Could
> anyone help me out of it?
>
>
>
> Thanks,
>
> Guobao
>
>
>


How does Flink handle backpressure in EMR

2019-12-03 Thread Nguyen, Michael
Hello all,

How does Flink handle backpressure (caused by an increase in traffic) in a 
Flink job when it’s being hosted in an EMR cluster? Does Flink detect the 
backpressure and auto-scales the EMR cluster to handle the workload to relieve 
the backpressure? Once the backpressure is gone, then the EMR cluster would 
scale back down?

Thanks,
Michael




Localenvironment jobcluster ha High availability

2019-12-03 Thread Eric HOFFMANN
Hi, i use a jobcluster (1 manager and 1 worker) in kubernetes for streaming 
application, i would like to have the lightest possible solution, is it 
possible to use a localenvironment (manager and worker embeded) and still have 
HA with zookeeper in this mode?, I mean kubernetes will restart the job, in the 
case of jobcluster, metadata are retrieve from zookeeper and data from S3 or 
hdfs, is this pattern the same in localenvironment ?
Thx
Eric

This message contains confidential information and is intended only for the 
individual(s) addressed in the message. If you are not the named addressee, you 
should not disseminate, distribute, or copy this e-mail. If you are not the 
intended recipient, you are notified that disclosing, distributing, or copying 
this e-mail is strictly prohibited.


Re: Event Timestamp corrupted by timezone

2019-12-03 Thread Lasse Nedergaard
Hi. 

We have the same Challenges. I asked on Flink forward and it’s a known problem. 
We input in utc but Flink output in local machine time. We have created a 
function that converts it back to utc before collecting to down stream. 

Med venlig hilsen / Best regards
Lasse Nedergaard


> Den 3. dec. 2019 kl. 15.16 skrev Wojciech Indyk :
> 
> Hi!
> I use Flink 1.8 with Scala. I think I've found a problem with event 
> timestamps in TableAPI. When I mark my timestamp: Long as .rowtime and then 
> save it back to stream as sql.Timestamp I will get wrong .getTime result. The 
> gist for reproduction is here: 
> https://gist.github.com/woj-i/b1dfbb71590b7f1c0c58be1f9e41c610
> When I change my timezome from GMT+1 to GMT everything works ok.
> I've found this post from March 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/event-time-timezone-is-not-correct-tt26457.html
>  but it's not resolved. The most relevant ticket I've found 
> https://issues.apache.org/jira/browse/FLINK-8353 seems to not include the 
> problem I described.
> 
> 1. Can you confirm it's a bug?
> 2. Should I post this bug somewhere to be at least planned to solve?
> 3. Can you recommend me a workaround for the described problem? 
> 
> --
> Kind regards/ Pozdrawiam,
> Wojciech Indyk


Building with Hadoop 3

2019-12-03 Thread Foster, Craig
Hi:
I don’t see a JIRA for Hadoop 3 support. I see a comment on a JIRA here from a 
year ago that no one is looking into Hadoop 3 support [1]. Is there a document 
or JIRA that now exists which would point to what needs to be done to support 
Hadoop 3? Right now builds with Hadoop 3 don’t work obviously because there’s 
no flink-shaded-hadoop-3 artifacts.

Thanks!
Craig

[1] https://issues.apache.org/jira/browse/FLINK-11086



Add time attribute column to POJO stream

2019-12-03 Thread Chris Miller
I'm having trouble dealing with a DataStream of POJOs. In particular, 
when I perform SQL operations on it I can't figure out the syntax for 
referring to individual fields within the POJO.


Below is an example that illustrates the problem and the various 
approaches I've tried. Can anyone please point me in the right 
direction?


import java.util.Arrays;
import java.util.List;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;

public class PojoTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv);

// Using tuples, this works as expected
List> tupleData = Arrays.asList(
new Tuple2<>("USD", 1.0),
new Tuple2<>("GBP", 1.3),
new Tuple2<>("EUR", 1.11));
DataStreamSource> tupleStream = 
streamEnv.fromCollection(tupleData);
tableEnv.fromDataStream(tupleStream, "currency, rate").printSchema();

// Using a DataStream of POJOs, how do I obtain an equivalent table to the 
above?
List pojoData = Arrays.asList(
new FxRate("USD", 1.0),
new FxRate("GBP", 1.3),
new FxRate("EUR", 1.11));
DataStreamSource pojoStream = streamEnv.fromCollection(pojoData);

Table pojoTable = tableEnv.fromDataStream(pojoStream, "fx");
pojoTable.printSchema();

// This fails with "ValidationException: Cannot resolve field [currency], input 
field list:[fx]"
pojoTable.select("currency, rate").printSchema();

// This fails with "ValidationException: Undefined function: currency"
pojoTable.select("fx.currency AS currency, fx.rate AS rate").printSchema();

// This fails with "ValidationException: Too many fields referenced from an atomic 
type"
tableEnv.fromDataStream(pojoStream, "currency, rate").printSchema();

// This fails with "ValidationException: Field reference expression expected"
tableEnv.fromDataStream(pojoStream, "fx.currency, fx.rate").printSchema();

streamEnv.execute();
  }

public static class FxRate {
public String currency;
public double rate;

public FxRate(String currency, double rate) {
this.currency = currency;
this.rate = rate;
}

@Override
public String toString() {
return "FxRate{currency='" + currency + '\'' + ", rate=" + rate + '}';
}
  }
}

A problem of open in AggregateFunction

2019-12-03 Thread Guobao Li
Hi community,

I am trying to register a metric in an aggregate UDF by overriding the open 
function. According to the documentation, the open function can be override in 
order to retrieve the metric group to do the metric registration. But it works 
only on ScalarFunction not on AggregateFunction. Since the open function is not 
invoked by AggregateFunction. Could anyone help me out of it?

Thanks,
Guobao



Event Timestamp corrupted by timezone

2019-12-03 Thread Wojciech Indyk
Hi!
I use Flink 1.8 with Scala. I think I've found a problem with event
timestamps in TableAPI. When I mark my timestamp: Long as .rowtime and then
save it back to stream as sql.Timestamp I will get wrong .getTime result.
The gist for reproduction is here:
https://gist.github.com/woj-i/b1dfbb71590b7f1c0c58be1f9e41c610
When I change my timezome from GMT+1 to GMT everything works ok.
I've found this post from March
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/event-time-timezone-is-not-correct-tt26457.html
but it's not resolved. The most relevant ticket I've found
https://issues.apache.org/jira/browse/FLINK-8353 seems to not include the
problem I described.

1. Can you confirm it's a bug?
2. Should I post this bug somewhere to be at least planned to solve?
3. Can you recommend me a workaround for the described problem?

--
Kind regards/ Pozdrawiam,
Wojciech Indyk


Re: [DISCUSS] Drop RequiredParameters and OptionType

2019-12-03 Thread Dian Fu
+1 to remove them. It seems that we should also drop the class Option as it's 
currently only used in RequiredParameters.

> 在 2019年12月3日,下午8:34,Robert Metzger  写道:
> 
> +1 on removing it.
> 
> On Tue, Dec 3, 2019 at 12:31 PM Stephan Ewen  > wrote:
> I just stumbled across these classes recently and was looking for sample uses.
> No examples and other tests in the code base seem to use RequiredParameters 
> and OptionType.
> 
> They also seem quite redundant with how ParameterTool itself works 
> (tool.getRequired()).
> 
> Should we drop them, in an attempt to reduce unnecessary code and confusion 
> for users (multiple ways to do the same thing)? There are also many better 
> command line parsing libraries out there, this seems like something we don't 
> need to solve in Flink.
> 
> Best,
> Stephan



Re: [DISCUSS] Drop RequiredParameters and OptionType

2019-12-03 Thread Robert Metzger
+1 on removing it.

On Tue, Dec 3, 2019 at 12:31 PM Stephan Ewen  wrote:

> I just stumbled across these classes recently and was looking for sample
> uses.
> No examples and other tests in the code base seem to
> use RequiredParameters and OptionType.
>
> They also seem quite redundant with how ParameterTool itself works
> (tool.getRequired()).
>
> Should we drop them, in an attempt to reduce unnecessary code and
> confusion for users (multiple ways to do the same thing)? There are also
> many better command line parsing libraries out there, this seems like
> something we don't need to solve in Flink.
>
> Best,
> Stephan
>


Re: CoGroup SortMerger performance degradation from 1.6.4 - 1.9.1?

2019-12-03 Thread Piotr Nowojski
Hi,

Yes, it is only related to **batch** jobs, but not necessarily only to DataSet 
API jobs. If you are using for example Blink SQL/Table API to process some 
bounded data streams (tables), it could also be visible/affected there. If not, 
I would suggest to start a new user mailing list question and posting the 
details (what are you running, job manager/task manager logs, …).

Piotrek

> On 2 Dec 2019, at 10:51, Victor Wong  wrote:
> 
> Hi,
> 
> We encountered similar issues that the task manager kept being killed by yarn.
> 
> - flink 1.9.1
> - heap usage is low.
> 
> But our job is a **streaming** job, so I want to ask if this issue is only 
> related to **batch** job or not? Thanks!
> 
> Best,
> Victor
> 
> 
> yingjie mailto:yjclove...@gmail.com>> 于2019年11月28日周四 
> 上午11:43写道:
> Piotr is right, that depend on the data size you are reading and the memory
> pressure. Those memory occupied by mmapped region can be recycled and used
> by other processes if memory pressure is high, that is, other process or
> service on the same node won't be affected because the OS will recycle the
> mmapped pages if needed. But currently, you can't assume a bound of the
> memory can be used, because it will use more memory as long as there is free
> space and you have more new data to read.
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
> 
> 
> 
> -- 
> 
> Best,
> Victor



Re: Table/SQL API to read and parse JSON, Java.

2019-12-03 Thread Zhenghua Gao
the kafka connector jar is missing in your class path

*Best Regards,*
*Zhenghua Gao*


On Mon, Dec 2, 2019 at 2:14 PM srikanth flink  wrote:

> Hi there,
>
> I'm following the link
> 
> to read JSON data from Kafka and convert to table, programmatically. I'd
> try and succeed declarative using SQL client.
>
> My Json data is nested like: {a:1,b,2,c:{x:1,y:2}}.
> Code:
>
>> String schema = "{type: 'object', properties: {'message': {type:
>> 'string'},'@timestamp': {type: 'string'}}}";
>> final StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.enableCheckpointing(6, CheckpointingMode.EXACTLY_ONCE);
>> env.getCheckpointConfig().getCheckpointTimeout();
>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>>
>> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>>
>> tableEnv.connect(new
>> Kafka().version("universal").topic("recon-data").startFromEarliest()
>> .property("zookeeper.connect", "localhost:2181")
>> .property("bootstrap.servers", "localhost:9092"))
>> .withFormat(new
>> Json().failOnMissingField(false).jsonSchema(schema).deriveSchema())
>> .withSchema(new Schema().field("message",
>> Types.STRING()).field("@timestamp", Types.LOCAL_DATE_TIME()))
>> .inAppendMode().registerTableSource("reconTableS");
>>
>> Table t = tableEnv.sqlQuery("select * from reconTableS");
>> DataStream out = tableEnv.toAppendStream(t, Row.class);
>> out.print();
>>
>> try {
>> env.execute("Flink Example Json");
>> } catch (Exception e) {
>> e.printStackTrace();
>> }
>> }
>>
>
> pom.xml:
>
>> 
>> UTF-8
>> 1.9.0
>> 1.8
>> 2.11
>> ${java.version}
>> ${java.version}
>> 
>>
> 
>> 
>> org.apache.flink
>> flink-streaming-scala_2.11
>> ${flink.version}
>> 
>> 
>> 
>> org.apache.flink
>> flink-table-common
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-table-planner_2.11
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-table-api-java-bridge_2.11
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-java
>> ${flink.version}
>> 
>> 
>> 
>> org.apache.flink
>> flink-streaming-java_${scala.binary.version}
>> ${flink.version}
>> 
>> 
>> 
>> org.apache.flink
>> flink-connector-kafka_2.12
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-json
>> ${flink.version}
>> 
>> 
>> org.apache.flink
>> flink-cep_2.11
>> ${flink.version}
>> 
>> 
>> mysql
>> mysql-connector-java
>> 5.1.39
>> 
>> 
>>
>
> The code threw the following error:
>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error: findAndCreateTableSource failed.
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
>> at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>> at
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
>> at
>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
>> at
>> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
>> at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
>> at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>> at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
>> Caused by: org.apache.flink.table.api.TableException:
>> findAndCreateTableSource failed.
>> at
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
>> at
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
>> at
>> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
>> at
>> kafka.flink.stream.list.match.ExampleJsonParser.main(ExampleJsonParser.java:31)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>> ... 12 more
>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
>> Could not find a suitable t

[DISCUSS] Drop RequiredParameters and OptionType

2019-12-03 Thread Stephan Ewen
I just stumbled across these classes recently and was looking for sample
uses.
No examples and other tests in the code base seem to use RequiredParameters
and OptionType.

They also seem quite redundant with how ParameterTool itself works
(tool.getRequired()).

Should we drop them, in an attempt to reduce unnecessary code and confusion
for users (multiple ways to do the same thing)? There are also many better
command line parsing libraries out there, this seems like something we
don't need to solve in Flink.

Best,
Stephan


Re: [DISCUSSION] Using `DataStreamUtils.reinterpretAsKeyedStream` on a pre-partitioned Kafka topic

2019-12-03 Thread Robin Cassan
Thanks for your answers, we will have a look at adapting the Kafka source
to assign the input partitions depending on the assigned Keygroups. If
anyone has already done such a thing I'd love your advice!

Cheers

Robin

Le lun. 2 déc. 2019 à 08:48, Gyula Fóra  a écrit :

> Hi!
>
> As far as I know,  even if you prepartition the data exactly the same way
> in kafka using the key groups, you have  no guarantee that the kafka
> consumer source would pick up the right partitions.
>
> Maybe if you have exactly as many kafka partitions as keygroups/max
> parallelism, partitioned correctly , but even then you might have to use a
> custom source to have the correct partition assignment for the sub tasks.
>
> Long story short, I believe the built in Kafka source doesnt support what
> you want. But it should be possible to adapt it to do so.
>
> Cheers
> Gyula
>
> On Mon, Dec 2, 2019, 03:49 Congxian Qiu  wrote:
>
>> Hi
>>
>> From the doc[1], the DataStream MUST already be pre-partitioned in
>> EXACTLY the same way Flink’s keyBy would partition the data in a shuffle
>> w.r.t. key-group assignment.
>> you should make sure that the key locates in the right key-group, and the
>> key-group locates in the right parallelism. you can ref
>> KeyGroupRangeAssignment[2] for more information.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.html
>> Best,
>> Congxian
>>
>>
>> Robin Cassan  于2019年11月30日周六 上午12:17写道:
>>
>>> Hi all!
>>>
>>> We are trying to build a Flink job that consumes a Kafka topic, groups
>>> the incoming events in Session Windows according to a String that can
>>> be generated by parsing the message (we call it `SessionKey`) and does
>>> some processing on the windows before sending them to another Kafka
>>> topic.
>>> Our first implementation used a `keyBy` operator on the incoming
>>> messages before creating the window, but we realized that we could
>>> pre-partition our data by `SessionKey` when we insert it into the input
>>> Kafka topic with a custom component. This would avoid having to
>>> shuffle data around in Flink, since, for a given `SessionKey`, we would
>>> ensure that all messages with this key will end-up in the same Kafka
>>> partition and thus be read by the same subtask, on a single
>>> TaskManager. This means that we should be able to create a keyed-stream
>>> from the incoming data without having to transfer data between
>>> TaskManagers.
>>>
>>> To achieve that, we have used the `reinterpretAsKeyedStream` method
>>> instead of the previous `keyBy`. This got rid of the shuffling step,
>>> but we are wondering if this is the right way of using this feature and
>>> whether Flink can manage to match the distribution of Keys from Kafka
>>> with the ones assigned to each TaskManager?
>>> We have observed that, while attempting to trigger a savepoint, we
>>> would encounter exceptions that seem to indicate that the TaskManagers
>>> received data whose `SessionKey` didn't match their assigned Keys.
>>> Here is one of the stacktrace we saw while savepointing:
>>>
>>> ```
>>> java.lang.IllegalArgumentException: Key group 0 is not in
>>> KeyGroupRange{startKeyGroup=54, endKeyGroup=55}.
>>> at
>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.computeKeyGroupIndex(KeyGroupRangeOffsets.java:142)
>>> at
>>> org.apache.flink.runtime.state.KeyGroupRangeOffsets.setKeyGroupOffset(KeyGroupRangeOffsets.java:104)
>>> at
>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeKVStateData(RocksFullSnapshotStrategy.java:316)
>>> at
>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.writeSnapshotToOutputStream(RocksFullSnapshotStrategy.java:258)
>>> at
>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:223)
>>> at
>>> org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy$SnapshotAsynchronousPartCallable.callInternal(RocksFullSnapshotStrategy.java:176)
>>> at
>>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at
>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:392)
>>> ```
>>> We are currently using Flink 1.8.2 on Kubernetes, savepointing to
>>> Amazon S3.
>>>
>>> Is our observation about Flink not being able to match the Kafka
>>> partitioning with the TaskManager's assigned KeyGroups correct?
>>> And if so, do you have any pointers on how we could pre-partition our
>>> data in Kafka so that Flink can avoid shuffling data before creating
>>> the Session Windows?
>>>
>>> Cheers,
>>>
>>> Robin
>>>
>>> --
>>>
>>>
>>>