Flink Kafka Connector Source Parallelism

2020-05-28 Thread Chen, Mason
Hi all,

I’m currently trying to understand Flink’s Kafka Connector and how parallelism 
affects it. So, I am running the flink playground click count job and the 
parallelism is set to 2 by default.


However, I don’t see the 2nd subtask of the Kafka Connector sending any 
records: https://imgur.com/cA5ucSg. Do I need to rebalance after reading from 
kafka?

```
clicks = clicks
   .keyBy(ClickEvent::getPage)
   .map(new BackpressureMap())
   .name("Backpressure");
```

`clicks` is the kafka click stream. From my reading in the operator docs, it 
seems counterintuitive to do a `rebalance()` when I am already doing a 
`keyBy()`.

So, my questions:

1. How do I make use of the 2nd subtask?
2. Does the number of partitions have some sort of correspondence with the 
parallelism of the source operator? If so, is there a general statement to be 
made about parallelism across all source operators?

Thanks,
Mason


Re: Webinar: Unlocking the Power of Apache Beam with Apache Flink

2020-05-28 Thread Marta Paes Moreira
Thanks for sharing, Aizhamal - it was a great webinar!

Marta

On Wed, 27 May 2020 at 23:17, Aizhamal Nurmamat kyzy 
wrote:

> Thank you all for attending today's session! Here is the YT recording:
> https://www.youtube.com/watch?v=ZCV9aRDd30U
> And link to the slides:
> https://github.com/aijamalnk/beam-learning-month/blob/master/Unlocking%20the%20Power%20of%20Apache%20Beam%20with%20Apache%20Flink.pdf
>
> On Tue, May 26, 2020 at 8:32 AM Aizhamal Nurmamat kyzy <
> aizha...@apache.org> wrote:
>
>> Hi all,
>>
>> Please join our webinar this Wednesday at 10am PST/5:00pm GMT/1:00pm EST
>> where Max Michels - PMC member for Apache Beam and Apache Flink, will
>> deliver a talk about leveraging Apache Beam for large-scale stream and
>> batch analytics with Apache Flink.
>>
>> You can register via this link:
>> https://learn.xnextcon.com/event/eventdetails/W20052710
>>
>> Here is the short description of the talk:
>> ---
>> Apache Beam is a framework for writing stream and batch processing
>> pipelines using multiple languages such as Java, Python, SQL, or Go. Apache
>> Beam does not come with an execution engine of its own. Instead, it defers
>> the execution to its Runners which translate Beam pipelines for any
>> supported execution engine. Thus, users have complete control over the
>> language and the execution engine they use, without having to rewrite their
>> code.
>> In this talk, we will look at running Apache Beam pipelines with Apache
>> Flink. We will explain the concepts behind Apache Beams portability
>> framework for multi-language support, and then show how to get started
>> running Java, Python, and SQL pipelines.
>> 
>> Links to the slides and recordings of this and previous webinars you can
>> find here: https://github.com/aijamalnk/beam-learning-month
>>
>> Hope y'all are safe,
>> Aizhamal
>>
>


Re: Flink Kafka Connector Source Parallelism

2020-05-28 Thread Chen, Mason
I think I may have just answered my own question. There’s only one Kafka 
partition, so the maximum parallelism is one and it doesn’t really make sense 
to make another kafka consumer under the same group id. What threw me off is 
that there’s a 2nd subtask for the kafka source created even though it’s not 
actually doing anything. So, it seems a general statement can be made that (# 
kafka partitions) >= (# parallelism of flink kafka source)…well I guess you 
could have more parallelism than kafka partitions, but the extra subtasks will 
not doing anything.

From: "Chen, Mason" 
Date: Wednesday, May 27, 2020 at 11:09 PM
To: "user@flink.apache.org" 
Subject: Flink Kafka Connector Source Parallelism

Hi all,

I’m currently trying to understand Flink’s Kafka Connector and how parallelism 
affects it. So, I am running the flink playground click count job and the 
parallelism is set to 2 by default.



However, I don’t see the 2nd subtask of the Kafka Connector sending any 
records: https://imgur.com/cA5ucSg. Do I need to rebalance after reading from 
kafka?

```
clicks = clicks
   .keyBy(ClickEvent::getPage)
   .map(new BackpressureMap())
   .name("Backpressure");
```

`clicks` is the kafka click stream. From my reading in the operator docs, it 
seems counterintuitive to do a `rebalance()` when I am already doing a 
`keyBy()`.

So, my questions:

1. How do I make use of the 2nd subtask?
2. Does the number of partitions have some sort of correspondence with the 
parallelism of the source operator? If so, is there a general statement to be 
made about parallelism across all source operators?

Thanks,
Mason


Re: Webinar: Unlocking the Power of Apache Beam with Apache Flink

2020-05-28 Thread Maximilian Michels
Thanks to everyone who joined and asked questions. Really enjoyed this
new format!

-Max

On 28.05.20 08:09, Marta Paes Moreira wrote:
> Thanks for sharing, Aizhamal - it was a great webinar!
> 
> Marta
> 
> On Wed, 27 May 2020 at 23:17, Aizhamal Nurmamat kyzy
> mailto:aizha...@apache.org>> wrote:
> 
> Thank you all for attending today's session! Here is the YT
> recording: https://www.youtube.com/watch?v=ZCV9aRDd30U
> And link to the
> slides: 
> https://github.com/aijamalnk/beam-learning-month/blob/master/Unlocking%20the%20Power%20of%20Apache%20Beam%20with%20Apache%20Flink.pdf
> 
> On Tue, May 26, 2020 at 8:32 AM Aizhamal Nurmamat kyzy
> mailto:aizha...@apache.org>> wrote:
> 
> Hi all,
> 
> Please join our webinar this Wednesday at 10am PST/5:00pm
> GMT/1:00pm EST where Max Michels - PMC member for Apache Beam
> and Apache Flink, will deliver a talk about leveraging Apache
> Beam for large-scale stream and batch analytics with Apache Flink. 
> 
> You can register via this
> link: https://learn.xnextcon.com/event/eventdetails/W20052710
> 
> Here is the short description of the talk:
> ---
> Apache Beam is a framework for writing stream and batch
> processing pipelines using multiple languages such as Java,
> Python, SQL, or Go. Apache Beam does not come with an execution
> engine of its own. Instead, it defers the execution to its
> Runners which translate Beam pipelines for any supported
> execution engine. Thus, users have complete control over the
> language and the execution engine they use, without having to
> rewrite their code.
> In this talk, we will look at running Apache Beam pipelines with
> Apache Flink. We will explain the concepts behind Apache Beams
> portability framework for multi-language support, and then show
> how to get started running Java, Python, and SQL pipelines.
> 
> Links to the slides and recordings of this and previous webinars
> you can find here: https://github.com/aijamalnk/beam-learning-month
> 
> Hope y'all are safe,
> Aizhamal
> 


Re: Flink TTL for MapStates and Sideoutputs implementations

2020-05-28 Thread Jaswin Shah
Thanks for responding Alexander.
We have solved the problem now with ValueState now. Basically, here we are 
implementing outer join logic with custom keyedCoprocessFunction 
implementations.


From: Alexander Fedulov 
Sent: 28 May 2020 17:24
To: Jaswin Shah 
Cc: user@flink.apache.org 
Subject: Re: Flink TTL for MapStates and Sideoutputs implementations

Hi Jaswin,

I would like to clarify something first - what do you key your streams by, when 
joining them?
It seems that what you want to do is to match each CartMessage with a 
corresponding Payment that has the same orderId+mid. If this is the case, you 
probably do not need the MapState in the first place.

Best,

--

Alexander Fedulov | Solutions Architect


[https://lh6.googleusercontent.com/BAYfe7E1EKlpcT1zGwlMWJEsZuwEv9KelOYQzIst9quO5oFdNebAja2EAsrJFipxig9u9ErB_5Tg2SQGSdLJo8lD3udSPG-uKope43NFO8lRMix-oMJSqwJLz9gOK8YtADdFSvR7]


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Event Driven | Real Time


On Fri, May 22, 2020 at 8:57 AM Jaswin Shah 
mailto:jaswin.s...@outlook.com>> wrote:

public class CartPGCoprocessFunction extends 
KeyedCoProcessFunction {

private static final Logger logger = 
LoggerFactory.getLogger(CartPGCoprocessFunction.class);

/**
 * Map state for cart messages, orderId+mid is key and cartMessage is value.
 */
private static MapState cartState = null;

/**
 * Map state for pg messages, orderId+mid is key and pgMessage is value.
 */
private static MapState pgState = null;

/**
 * Intializations for cart and pg mapStates
 *
 * @param config
 */
@Override
public void open(Configuration config) {
MapStateDescriptor cartStateDescriptor = new 
MapStateDescriptor<> (
Constants.CART_DATA,
TypeInformation.of(String.class),
TypeInformation.of(CartMessage.class)
);
cartState = getRuntimeContext().getMapState(cartStateDescriptor);

MapStateDescriptor 
pgStateDescriptor = new MapStateDescriptor<>(
Constants.PG_DATA,
TypeInformation.of(String.class),
TypeInformation.of(PaymentNotifyRequestWrapper.class)
);
pgState = getRuntimeContext().getMapState(pgStateDescriptor);
}

/**
 * 1. Get orderId+mid from cartMessage and check in PGMapState if an entry 
is present.
 * 2. If present, match, checkDescripancy, process and delete entry from 
pgMapState.
 * 3. If not present, add orderId+mid as key and cart object as value in 
cartMapState.
 * @param cartMessage
 * @param context
 * @param collector
 * @throws Exception
 */
@Override
public void processElement1(CartMessage cartMessage, Context context, 
Collector collector) throws Exception {
String searchKey = cartMessage.createJoinStringCondition();
PaymentNotifyRequestWrapper paymentNotifyObject = 
pgState.get(searchKey);
if(Objects.nonNull(paymentNotifyObject)) {
generateResultMessage(cartMessage,paymentNotifyObject,collector);
pgState.remove(searchKey);
} else {
cartState.put(searchKey,cartMessage);
}
}

/**
 * 1. Get orderId+mid from pgMessage and check in cartMapState if an entry 
is present.
 * 2. If present, match, checkDescripancy, process and delete entry from 
cartMapState.
 * 3. If not present, add orderId+mid as key and cart object as value in 
pgMapState.
 * @param pgMessage
 * @param context
 * @param collector
 * @throws Exception
 */
@Override
public void processElement2(PaymentNotifyRequestWrapper pgMessage, Context 
context, Collector collector) throws Exception {
String searchKey = pgMessage.createJoinStringCondition();
CartMessage cartMessage = cartState.get(searchKey);
if(Objects.nonNull(cartMessage)) {
generateResultMessage(cartMessage,pgMessage,collector);
cartState.remove(searchKey);
} else {
pgState.put(searchKey,pgMessage);
}
}

/**
 * Create ResultMessage from cart and pg messages.
 *
 * @param cartMessage
 * @param pgMessage
 * @return
 */
private void generateResultMessage(CartMessage cartMessage, 
PaymentNotifyRequestWrapper pgMessage,Collector collector) {
ResultMessage resultMessage = new ResultMessage();
Payment payment = null;

//Logic should be in cart: check
for (Payment pay : cartMessage.getPayments()) {
if (StringUtils.equals(Constants.FORWARD_PAYMENT, 
pay.mapToPaymentTypeInPG()) && StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, 
pay.getProvider())) {
payment = pay;
break;
}
}
if(Objects.isNull(payment)) {
return;
}


How to create schema for flexible json data in Flink SQL

2020-05-28 Thread Guodong Wang
Hi !

I want to use Flink SQL to process some json events. It is quite
challenging to define a schema for the Flink SQL table.

My data source's format is some json like this
{
"top_level_key1": "some value",
"nested_object": {
"nested_key1": "abc",
"nested_key2": 123,
"nested_key3": ["element1", "element2", "element3"]
}
}

The big challenges for me to define a schema for the data source are
1. the keys in nested_object are flexible, there might be 3 unique keys or
more unique keys. If I enumerate all the keys in the schema, I think my
code is fragile, how to handle event which contains more  nested_keys in
nested_object ?
2. I know table api support Map type, but I am not sure if I can put
generic object as the value of the map. Because the values in nested_object
are of different types, some of them are int, some of them are string or
array.

So. how to expose this kind of json data as table in Flink SQL without
enumerating all the nested_keys?

Thanks.

Guodong


Re: Multiple Sinks for a Single Soure

2020-05-28 Thread Alexander Fedulov
Hi Prasanna,

if the set of all possible sinks is known in advance, side outputs will be
generic enough to express your requirements. Side output produces a stream.
Create all of the side output tags, associate each of them with one sink,
add conditional logic around `ctx.output(outputTag, ... *)*;`  to decide
where to dispatch the messages  (see [1]), collect to none or many side
outputs, depending on your logic.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html

--

Alexander Fedulov | Solutions Architect



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time


On Tue, May 26, 2020 at 2:57 PM Prasanna kumar <
prasannakumarram...@gmail.com> wrote:

> Piotr,
>
> There is an event and subscriber registry as JSON file which has the table
> event mapping and event-subscriber mapping as mentioned below.
>
> Based on the set JSON , we need to job to go through the table updates and
> create events and for each event there is a way set how to sink them.
>
> The sink streams have to be added based on this JSON. Thats what i
> mentioned as no predefined sink in code earlier.
>
> You could see that each event has different set of sinks.
>
> Just checking how much generic could Side-output streams be ?.
>
> Source -> generate events -> (find out sinks dynamically in code ) ->
> write to the respective sinks.
>
> {
>   " tablename ": "source.table1",
>   "events": [
> {
>   "operation": "update",
>   "eventstobecreated": [
> {
>   "eventname": "USERUPDATE",
>   "Columnoperation": "and",
>   "ColumnChanges": [
> {
>   "columnname": "name"
> },
> {
>   "columnname": "loginenabled",
>   "value": "Y"
> }
>   ],
>   "Subscribers": [
> {
>   "customername": "c1",
>   "method": "Kafka",
>   "methodparams": {
> "topicname": "USERTOPIC"
>   }
> },
> {
>   "customername": "c2",
>   "method": "S3",
>   "methodparams": {
> "folder": "aws://folderC2"
>   }}, ]}]
> },
> {
>   "operation": "insert",
>   "eventstobecreated": [
>   "eventname": "USERINSERT",
>   "operation": "insert",
>   "Subscribers": [
> {
>   "teamname": "General",
>   "method": "Kafka",
>   "methodparams": {
> "topicname": "new_users"
>   }
> },
> {
>   "teamname": "General",
>   "method": "kinesis",
>   "methodparams": {
> "URL": "new_users",
> "username": "uname",
> "password":  "pwd"
>   }}, ]}]
> },
> {
>   "operation": "delete",
>   "eventstobecreated": [
> {
>   "eventname": "USERDELETE",
>   "Subscribers": [
> {
>   "customername": "c1",
>   "method": "Kafka",
>   "methodparams": {
> "topicname": "USERTOPIC"
>   }
> },
> {
>   "customername": "c4",
>   "method": "Kafka",
>   "methodparams": {
> "topicname": "deleterecords"
>  }}, ]}]
>  },
> }
>
> Please let me know your thoughts on this.
>
> Thanks,
> Prasanna.
>
> On Tue, May 26, 2020 at 5:34 PM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> I’m not sure if I fully understand what do you mean by
>>
>> > The point is the sink are not predefined.
>>
>> You must know before submitting the job, what sinks are going to be used
>> in the job. You can have some custom logic, that would filter out records
>> before writing them to the sinks, as I proposed before, or you could use
>> side outputs [1] would be better suited to your use case?
>>
>> Piotrek
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>>
>> On 26 May 2020, at 12:20, Prasanna kumar 
>> wrote:
>>
>> Thanks Piotr for the Reply.
>>
>> I will explain my requirement in detail.
>>
>> Table Updates -> Generate Business Events -> Subscribers
>>
>> *Source Side*
>> There are CDC of 100 tables which the framework needs to listen to.
>>
>> *Event Table Mapping*
>>
>> There would be Event associated with table in a *m:n* fashion.
>>
>> say there are tables TA, TB, TC.
>>
>> EA, EA2 and EA3 are generated from TA (based on conditions)
>> EB generated from TB (based on conditions)
>> EC generated from TC (no conditions.)
>>
>> Say there are events EA,EB,EC generated from the tables TA, TB, TC
>>
>> *Event Sink Mapping*
>>
>> EA has following sinks. kafka topic SA,SA2,SAC.
>> EB has following sinks. kafka topic SB , S3 

New dates for Flink Forward Global Virtual 2020

2020-05-28 Thread Ana Vasiliuk
Hi everyone,

Flink Forward Global Virtual 2020 is now a 4-day conference, featuring two
training days on October 19 & 20! The organizers have decided to extend the
training program for this event to ensure that you get the most out of your
time with our team of Flink experts.

*New dates:*
Apache Flink Training - October 19 - 20
Flink Forward keynotes and breakout sessions - October 21 - 22

The conference days will be free to attend and there will be a limited
number of paid training tickets available soon. Please reserve your spot at
http://flink-forward.org/global-2020.

More information to follow, including pricing and further details of the
training agenda. If you have any questions, please feel free to reach out
to the organizing team via *he...@flink-forward.org
*.

The *Call for Presentations* is also open, so if you want to share your
real-world world use cases and best practices with an international
audience of Flink enthusiasts, don’t forget to submit your talk by *June 19*,
for a chance to be included in the program!

Submit your talk at
https://www.flink-forward.org/global-2020/call-for-presentations.

Hope to see you virtually in October!
Ana

-- 

*Ana Vasiliuk *| Community Marketing Manager





Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Tony) Cheng


Re: Flink TTL for MapStates and Sideoutputs implementations

2020-05-28 Thread Alexander Fedulov
Hi Jaswin,

I would like to clarify something first - what do you key your streams by,
when joining them?
It seems that what you want to do is to match each CartMessage with a
corresponding Payment that has the same orderId+mid. If this is the case,
you probably do not need the MapState in the first place.

Best,

--

Alexander Fedulov | Solutions Architect



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time


On Fri, May 22, 2020 at 8:57 AM Jaswin Shah  wrote:

> public class CartPGCoprocessFunction extends 
> KeyedCoProcessFunction ResultMessage> {
>
> private static final Logger logger = 
> LoggerFactory.getLogger(CartPGCoprocessFunction.class);
>
> /**
>  * Map state for cart messages, orderId+mid is key and cartMessage is 
> value.
>  */
> private static MapState cartState = null;
>
> /**
>  * Map state for pg messages, orderId+mid is key and pgMessage is value.
>  */
> private static MapState pgState = 
> null;
>
> /**
>  * Intializations for cart and pg mapStates
>  *
>  * @param config
>  */
> @Override
> public void open(Configuration config) {
> MapStateDescriptor cartStateDescriptor = new 
> MapStateDescriptor<> (
> Constants.CART_DATA,
> TypeInformation.of(String.class),
> TypeInformation.of(CartMessage.class)
> );
> cartState = getRuntimeContext().getMapState(cartStateDescriptor);
>
> MapStateDescriptor 
> pgStateDescriptor = new MapStateDescriptor<>(
> Constants.PG_DATA,
> TypeInformation.of(String.class),
> TypeInformation.of(PaymentNotifyRequestWrapper.class)
> );
> pgState = getRuntimeContext().getMapState(pgStateDescriptor);
> }
>
> /**
>  * 1. Get orderId+mid from cartMessage and check in PGMapState if an 
> entry is present.
>  * 2. If present, match, checkDescripancy, process and delete entry from 
> pgMapState.
>  * 3. If not present, add orderId+mid as key and cart object as value in 
> cartMapState.
>  * @param cartMessage
>  * @param context
>  * @param collector
>  * @throws Exception
>  */
> @Override
> public void processElement1(CartMessage cartMessage, Context context, 
> Collector collector) throws Exception {
> String searchKey = cartMessage.createJoinStringCondition();
> PaymentNotifyRequestWrapper paymentNotifyObject = 
> pgState.get(searchKey);
> if(Objects.nonNull(paymentNotifyObject)) {
> generateResultMessage(cartMessage,paymentNotifyObject,collector);
> pgState.remove(searchKey);
> } else {
> cartState.put(searchKey,cartMessage);
> }
> }
>
> /**
>  * 1. Get orderId+mid from pgMessage and check in cartMapState if an 
> entry is present.
>  * 2. If present, match, checkDescripancy, process and delete entry from 
> cartMapState.
>  * 3. If not present, add orderId+mid as key and cart object as value in 
> pgMapState.
>  * @param pgMessage
>  * @param context
>  * @param collector
>  * @throws Exception
>  */
> @Override
> public void processElement2(PaymentNotifyRequestWrapper pgMessage, 
> Context context, Collector collector) throws Exception {
> String searchKey = pgMessage.createJoinStringCondition();
> CartMessage cartMessage = cartState.get(searchKey);
> if(Objects.nonNull(cartMessage)) {
> generateResultMessage(cartMessage,pgMessage,collector);
> cartState.remove(searchKey);
> } else {
> pgState.put(searchKey,pgMessage);
> }
> }
>
> /**
>  * Create ResultMessage from cart and pg messages.
>  *
>  * @param cartMessage
>  * @param pgMessage
>  * @return
>  */
> private void generateResultMessage(CartMessage cartMessage, 
> PaymentNotifyRequestWrapper pgMessage,Collector collector) {
> ResultMessage resultMessage = new ResultMessage();
> Payment payment = null;
>
> //Logic should be in cart: check
> for (Payment pay : cartMessage.getPayments()) {
> if (StringUtils.equals(Constants.FORWARD_PAYMENT, 
> pay.mapToPaymentTypeInPG()) && 
> StringUtils.equals(Constants.PAYTM_NEW_PROVIDER, pay.getProvider())) {
> payment = pay;
> break;
> }
> }
> if(Objects.isNull(payment)) {
> return;
> }
>
> resultMessage.setOrderId(cartMessage.getId());
> resultMessage.setMid(payment.getMid());
>
> 
> resultMessage.setCartOrderStatus(cartMessage.mapToOrderStatus().getCode());
> resultMessage.setPgOrderStatus(pgMessage.getOrderStatus());
>
> resultMessage.setCartOrderCompletionTime(payment.getUpdated_at());
> 

Re: How to create schema for flexible json data in Flink SQL

2020-05-28 Thread Benchao Li
Hi Guodong,

I think you almost get the answer,
1. map type, it's not working for current implementation. For example, use
map, if the value if non-string json object, then
`JsonNode.asText()` may not work as you wish.
2. list all fields you cares. IMO, this can fit your scenario. And you can
set format.fail-on-missing-field = true, to allow setting non-existed
fields to be null.

For 1, I think maybe we can support it in the future, and I've created
jira[1] to track this.

[1] https://issues.apache.org/jira/browse/FLINK-18002

Guodong Wang  于2020年5月28日周四 下午6:32写道:

> Hi !
>
> I want to use Flink SQL to process some json events. It is quite
> challenging to define a schema for the Flink SQL table.
>
> My data source's format is some json like this
> {
> "top_level_key1": "some value",
> "nested_object": {
> "nested_key1": "abc",
> "nested_key2": 123,
> "nested_key3": ["element1", "element2", "element3"]
> }
> }
>
> The big challenges for me to define a schema for the data source are
> 1. the keys in nested_object are flexible, there might be 3 unique keys or
> more unique keys. If I enumerate all the keys in the schema, I think my
> code is fragile, how to handle event which contains more  nested_keys in
> nested_object ?
> 2. I know table api support Map type, but I am not sure if I can put
> generic object as the value of the map. Because the values in nested_object
> are of different types, some of them are int, some of them are string or
> array.
>
> So. how to expose this kind of json data as table in Flink SQL without
> enumerating all the nested_keys?
>
> Thanks.
>
> Guodong
>


-- 

Best,
Benchao Li


Re: Installing Ververica, unable to write to file system

2020-05-28 Thread Marta Paes Moreira
Hi, Charlie.

This is not the best place for questions about Ververica Platform CE.
Please use community-edit...@ververica.com instead — someone will be able
to support you there!

If you have any questions related to Flink itself, feel free to reach out
to this mailing list again in the future.

Thanks,

Marta

On Wed, May 27, 2020 at 11:37 PM Corrigan, Charlie <
charlie.corri...@nordstrom.com> wrote:

> Hello, I’m trying to install Ververica (community edition for a simple poc
> deploy) via helm using these directions
> , but the pod is
> failing with the following error:
>
>
>
> ```
>
> org.springframework.context.ApplicationContextException: Unable to start
> web server; nested exception is
> org.springframework.boot.web.server.WebServerException: Unable to create
> tempDir. java.io.tmpdir is set to /tmp
>
> ```
>
>
>
> By default, our file system is immutable in k8s. Usually for this error,
> we’d mount an emptyDir volume. I’ve tried to do that in ververica’s
> values.yaml file, but I might be configuring it incorrectly. Here is the
> relevant portion of the values.yaml. I can include the entire file if it’s
> helpful. Any advice on how to alter these values or proceed with the
> ververica installation with a read only file system?
>
>
>
> volumes:
>   - name: tmp
> emptyDir: {}
>
>
>
> *## ## Container configuration for the appmanager component ## *appmanager
> :
>   image:
> repository: registry.ververica.com/v2.1/vvp-appmanager
> tag: 2.1.0
> pullPolicy: Always
> volumeMounts:
>   - mountPath: /tmp
> name: tmp
>   resources:
> limits:
>   cpu: 1000m
>   memory: 1Gi
> requests:
>   cpu: 250m
>   memory: 1Gi
>
>   artifactFetcherTag: 2.1.0
>
>
>


Re: Multiple Sinks for a Single Soure

2020-05-28 Thread Prasanna kumar
Alexander,

Thanks for the reply. Will implement and come back in case of any
questions.

Prasanna.

On Thu, May 28, 2020 at 5:06 PM Alexander Fedulov 
wrote:

> Hi Prasanna,
>
> if the set of all possible sinks is known in advance, side outputs will be
> generic enough to express your requirements. Side output produces a stream.
> Create all of the side output tags, associate each of them with one sink,
> add conditional logic around `ctx.output(outputTag, ... *)*;`  to decide
> where to dispatch the messages  (see [1]), collect to none or many side
> outputs, depending on your logic.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>
> --
>
> Alexander Fedulov | Solutions Architect
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
>
> On Tue, May 26, 2020 at 2:57 PM Prasanna kumar <
> prasannakumarram...@gmail.com> wrote:
>
>> Piotr,
>>
>> There is an event and subscriber registry as JSON file which has the
>> table event mapping and event-subscriber mapping as mentioned below.
>>
>> Based on the set JSON , we need to job to go through the table updates
>> and create events and for each event there is a way set how to sink them.
>>
>> The sink streams have to be added based on this JSON. Thats what i
>> mentioned as no predefined sink in code earlier.
>>
>> You could see that each event has different set of sinks.
>>
>> Just checking how much generic could Side-output streams be ?.
>>
>> Source -> generate events -> (find out sinks dynamically in code ) ->
>> write to the respective sinks.
>>
>> {
>>   " tablename ": "source.table1",
>>   "events": [
>> {
>>   "operation": "update",
>>   "eventstobecreated": [
>> {
>>   "eventname": "USERUPDATE",
>>   "Columnoperation": "and",
>>   "ColumnChanges": [
>> {
>>   "columnname": "name"
>> },
>> {
>>   "columnname": "loginenabled",
>>   "value": "Y"
>> }
>>   ],
>>   "Subscribers": [
>> {
>>   "customername": "c1",
>>   "method": "Kafka",
>>   "methodparams": {
>> "topicname": "USERTOPIC"
>>   }
>> },
>> {
>>   "customername": "c2",
>>   "method": "S3",
>>   "methodparams": {
>> "folder": "aws://folderC2"
>>   }}, ]}]
>> },
>> {
>>   "operation": "insert",
>>   "eventstobecreated": [
>>   "eventname": "USERINSERT",
>>   "operation": "insert",
>>   "Subscribers": [
>> {
>>   "teamname": "General",
>>   "method": "Kafka",
>>   "methodparams": {
>> "topicname": "new_users"
>>   }
>> },
>> {
>>   "teamname": "General",
>>   "method": "kinesis",
>>   "methodparams": {
>> "URL": "new_users",
>> "username": "uname",
>> "password":  "pwd"
>>   }}, ]}]
>> },
>> {
>>   "operation": "delete",
>>   "eventstobecreated": [
>> {
>>   "eventname": "USERDELETE",
>>   "Subscribers": [
>> {
>>   "customername": "c1",
>>   "method": "Kafka",
>>   "methodparams": {
>> "topicname": "USERTOPIC"
>>   }
>> },
>> {
>>   "customername": "c4",
>>   "method": "Kafka",
>>   "methodparams": {
>> "topicname": "deleterecords"
>>  }}, ]}]
>>  },
>> }
>>
>> Please let me know your thoughts on this.
>>
>> Thanks,
>> Prasanna.
>>
>> On Tue, May 26, 2020 at 5:34 PM Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> I’m not sure if I fully understand what do you mean by
>>>
>>> > The point is the sink are not predefined.
>>>
>>> You must know before submitting the job, what sinks are going to be used
>>> in the job. You can have some custom logic, that would filter out records
>>> before writing them to the sinks, as I proposed before, or you could use
>>> side outputs [1] would be better suited to your use case?
>>>
>>> Piotrek
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html
>>>
>>> On 26 May 2020, at 12:20, Prasanna kumar 
>>> wrote:
>>>
>>> Thanks Piotr for the Reply.
>>>
>>> I will explain my requirement in detail.
>>>
>>> Table Updates -> Generate Business Events -> Subscribers
>>>
>>> *Source Side*
>>> There are CDC of 100 tables which the framework needs to listen to.
>>>
>>> *Event Table Mapping*
>>>
>>> There would be Event associated with table in a *m:n* fashion.
>>>
>>> say there are tables TA, TB, TC.
>>>
>>> 

Custom trigger to trigger for late events

2020-05-28 Thread Poornapragna Ts
Hi,

I have a simple requirement where i want to have 10 second window with
allow late events upto 1 hour.

Existing TumblingEventTimeWindows with EventTimeTrigger will work for this.

But the EventTimeTrigger, triggers for every incoming event after watermark
has passed windows max time. I don't want this behaviour. Even for late
events, I want to fire for every 10 seconds.

For this, I thought of writing custom trigger, which will be similar to
EventTimeTrigger, but instead of firing on every late event, it will
register timer in onElement method for upcoming 10th second.

With this setup, I have some questions.

1) When we register timers to context, is it compulsory to delete them on
clear() call?

2) Will these triggers be stored in fault tolerance state? So that deleting
is must.

3) Will it be problematic, if I call delete trigger for unregistered time(
i.e., if I call delete for time T1 for which I had not registered before.)

4) Without implementing custom trigger, can it be achieved?

5) Lets say, late event came at 255 second so I will register a timer to
trigger at 260(next 10th second). If a failure happens before that time,
then restarting from the checkpoint, Will it trigger when watermark reaches
260? That means will the trigger be recovered when we restart from failure.

Thanks,
Poornapragna T S


Re: History Server Not Showing Any Jobs - File Not Found?

2020-05-28 Thread Chesnay Schepler
If it were a class-loading issue I would think that we'd see an 
exception of some kind. Maybe double-check that flink-shaded-hadoop is 
not in the lib directory. (usually I would ask for the full classpath 
that the HS is started with, but as it turns out this isn't getting 
logged :( (FLINK-18008))


The fact that overview.json and jobs/overview.json are missing indicates 
that something goes wrong directly on startup. What is supposed to 
happens is that the HS starts, fetches all currently available archives 
and then creates these files.

So it seems like the download gets stuck for some reason.

Can you use jstack to create a thread dump, and see what the 
Flink-HistoryServer-ArchiveFetcher is doing?


I will also file a JIRA for adding more logging statements, like when 
fetching starts/stops.


On 27/05/2020 20:57, Hailu, Andreas wrote:


Hi Chesney, apologies for not getting back to you sooner here. So I 
did what you suggested - I downloaded a few files from my 
jobmanager.archive.fs.dir HDFS directory to a locally available 
directory named 
/local/scratch/hailua_p2epdlsuat/historyserver/archived/. I then 
changed my historyserver.archive.fs.dir to 
file:///local/scratch/hailua_p2epdlsuat/historyserver/archived/ and 
that seemed to work. I’m able to see the history of the applications I 
downloaded. So this points to a problem with sourcing the history from 
HDFS.


Do you think this could be classpath related? This is what we use for 
our HADOOP_CLASSPATH var:


//gns/software/infra/big-data/hadoop/hdp-2.6.5.0/hadoop/*:/gns/software/infra/big-data/hadoop/hdp-2.6.5.0/hadoop/lib/*:/gns/software/infra/big-data/hadoop/hdp-2.6.5.0/hadoop-hdfs/*:/gns/software/infra/big-data/hadoop/hdp-2.6.5.0/hadoop-hdfs/lib/*:/gns/software/infra/big-data/hadoop/hdp-2.6.5.0/hadoop-mapreduce/*:/gns/software/infra/big-data/hadoop/hdp-2.6.5.0/hadoop-mapreduce/lib/*:/gns/software/infra/big-data/hadoop/hdp-2.6.5.0/hadoop-yarn/*:/gns/software/infra/big-data/hadoop/hdp-2.6.5.0/hadoop-yarn/lib/*:/gns/software/ep/da/dataproc/dataproc-prod/lakeRmProxy.jar:/gns/software/infra/big-data/hadoop/hdp-2.6.5.0/hadoop/bin::/gns/mw/dbclient/postgres/jdbc/pg-jdbc-9.3.v01/postgresql-9.3-1100-jdbc4.jar/

//

You can see we have references to Hadoop mapred/yarn/hdfs libs in there.

*// *ah**

*From:*Chesnay Schepler 
*Sent:* Sunday, May 3, 2020 6:00 PM
*To:* Hailu, Andreas [Engineering] ; 
user@flink.apache.org

*Subject:* Re: History Server Not Showing Any Jobs - File Not Found?

yes, exactly; I want to rule out that (somehow) HDFS is the problem.

I couldn't reproduce the issue locally myself so far.

On 01/05/2020 22:31, Hailu, Andreas wrote:

Hi Chesnay, yes – they were created using Flink 1.9.1 as we’ve
only just started to archive them in the past couple weeks. Could
you clarify on how you want to try local filesystem archives? As
in changing jobmanager.archive.fs.dir and historyserver.web.tmpdir
to the same local directory?

*// *ah

*From:*Chesnay Schepler 

*Sent:* Wednesday, April 29, 2020 8:26 AM
*To:* Hailu, Andreas [Engineering] 
; user@flink.apache.org

*Subject:* Re: History Server Not Showing Any Jobs - File Not Found?

hmm...let's see if I can reproduce the issue locally.

Are the archives from the same version the history server runs on?
(Which I supposed would be 1.9.1?)

Just for the sake of narrowing things down, it would also be
interesting to check if it works with the archives residing in the
local filesystem.

On 27/04/2020 18:35, Hailu, Andreas wrote:

bash-4.1$ ls -l /local/scratch/flink_historyserver_tmpdir/

total 8

drwxrwxr-x 3 p2epdlsuat p2epdlsuat 4096 Apr 21 10:43
flink-web-history-7fbb97cc-9f38-4844-9bcf-6272fe6828e9

drwxrwxr-x 3 p2epdlsuat p2epdlsuat 4096 Apr 21 10:22
flink-web-history-95b3f928-c60f-4351-9926-766c6ad3ee76

There are just two directories in here. I don’t see cache
directories from my attempts today, which is interesting.
Looking a little deeper into them:

bash-4.1$ ls -lr

/local/scratch/flink_historyserver_tmpdir/flink-web-history-7fbb97cc-9f38-4844-9bcf-6272fe6828e9

total 1756

drwxrwxr-x 2 p2epdlsuat p2epdlsuat 1789952 Apr 21 10:44 jobs

bash-4.1$ ls -lr

/local/scratch/flink_historyserver_tmpdir/flink-web-history-7fbb97cc-9f38-4844-9bcf-6272fe6828e9/jobs

total 0

-rw-rw-r-- 1 p2epdlsuat p2epdlsuat 0 Apr 21 10:43 overview.json

There are indeed archives already in HDFS – I’ve included some
in my initial mail, but here they are again just for reference:

-bash-4.1$ hdfs dfs -ls /user/p2epda/lake/delp_qa/flink_hs

Found 44282 items

-rw-r- 3 delp datalake_admin_dev  50569 2020-03-21
23:17

Re: History Server Not Showing Any Jobs - File Not Found?

2020-05-28 Thread Chesnay Schepler

Looks like it is indeed stuck on downloading the archive.

I searched a bit in the Hadoop JIRA and found several similar instances:
https://issues.apache.org/jira/browse/HDFS-6999
https://issues.apache.org/jira/browse/HDFS-7005
https://issues.apache.org/jira/browse/HDFS-7145

It is supposed to be fixed in 2.6.0 though :/

If hadoop is available from the HADOOP_CLASSPATH and flink-shaded-hadoop 
in /lib then you basically don't know what Hadoop version is actually 
being used,

which could lead to incompatibilities and dependency clashes.
If flink-shaded-hadoop 2.4/2.5 is on the classpath, maybe that is being 
used and runs into HDFS-7005.


On 28/05/2020 16:27, Hailu, Andreas wrote:


Just created a dump, here’s what I see:

"Flink-HistoryServer-ArchiveFetcher-thread-1" #19 daemon prio=5 
os_prio=0 tid=0x7f93a5a2c000 nid=0x5692 runnable [0x7f934a0d3000]


java.lang.Thread.State: RUNNABLE

    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)

    at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)

    at 
sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)


    at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)

    - locked <0x0005df986960> (a sun.nio.ch.Util$2)

    - locked <0x0005df986948> (a 
java.util.Collections$UnmodifiableSet)


    - locked <0x0005df928390> (a sun.nio.ch.EPollSelectorImpl)

    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)

    at 
org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:335)


    at 
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157)


    at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)


    at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.readChannelFully(PacketReceiver.java:258)


    at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:209)


    at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:171)


    at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)


    at 
org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:201)


    at 
org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:152)


    - locked <0x0005ceade5e0> (a 
org.apache.hadoop.hdfs.RemoteBlockReader2)


    at 
org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:781)


    at 
org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:837)


    - eliminated <0x0005cead3688> (a 
org.apache.hadoop.hdfs.DFSInputStream)


    at 
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:897)


    - locked <0x0005cead3688> (a 
org.apache.hadoop.hdfs.DFSInputStream)


   at 
org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:945)


    - locked <0x0005cead3688> (a 
org.apache.hadoop.hdfs.DFSInputStream)


    at java.io.DataInputStream.read(DataInputStream.java:149)

    at 
org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)


    at java.io.InputStream.read(InputStream.java:101)

    at org.apache.flink.util.IOUtils.copyBytes(IOUtils.java:69)

    at org.apache.flink.util.IOUtils.copyBytes(IOUtils.java:91)

    at 
org.apache.flink.runtime.history.FsJobArchivist.getArchivedJsons(FsJobArchivist.java:110)


    at 
org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher$JobArchiveFetcherTask.run(HistoryServerArchiveFetcher.java:169)


    at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)


    at 
java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)


    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)


    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)


    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)


    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)


    at java.lang.Thread.run(Thread.java:745)

What problems could the flink-shaded-hadoop jar being included introduce?

*// *ah**

*From:*Chesnay Schepler 
*Sent:* Thursday, May 28, 2020 9:26 AM
*To:* Hailu, Andreas [Engineering] ; 
user@flink.apache.org

*Subject:* Re: History Server Not Showing Any Jobs - File Not Found?

If it were a class-loading issue I would think that we'd see an 
exception of some kind. Maybe double-check that flink-shaded-hadoop is 
not in the lib directory. (usually I would ask for the full classpath 
that the HS is started with, but as it turns out this isn't getting 
logged :( (FLINK-18008))


The fact that 

Dropping messages based on timestamp.

2020-05-28 Thread Joe Malt
Hi,

I'm working on a custom TimestampAssigner which will do different things
depending on the value of the extracted timestamp. One of the actions I
want to take is to drop messages entirely if their timestamp meets certain
criteria.

Of course there's no direct way to do this in the TimestampAssigner, but
I'd like to keep this logic as close to the TimestampAssigner as possible
since this is going to be a pluggable component used in a bunch of
different Flink apps.

What would be the best way to implement this?

Thanks,
Joe


Re: ClusterClientFactory selection

2020-05-28 Thread M Singh
 HI Kostas/Yang/Lake:
I am looking at aws emr and did not see the execution.target in the 
flink-conf.yaml file under flink/conf directory. Is it defined in another place 
?  
 I also did search in the current flink source code and did find mention of it 
in the md files but not in any property file or the flink-yarn sub module.  
Please let me know if I am missing anything.
Thanks
On Wednesday, May 27, 2020, 03:51:28 AM EDT, Kostas Kloudas 
 wrote:  
 
 Hi Singh,

The only thing to add to what Yang said is that the "execution.target"
configuration option (in the config file) is also used for the same
purpose from the execution environments.

Cheers,
Kostas

On Wed, May 27, 2020 at 4:49 AM Yang Wang  wrote:
>
> Hi M Singh,
>
> The Flink CLI picks up the correct ClusterClientFactory via java SPI. You
> could check YarnClusterClientFactory#isCompatibleWith for how it is activated.
> The cli option / configuration is "-e/--executor" or execution.target (e.g. 
> yarn-per-job).
>
>
> Best,
> Yang
>
> M Singh  于2020年5月26日周二 下午6:45写道:
>>
>> Hi:
>>
>> I wanted to find out which parameter/configuration allows flink cli pick up 
>> the appropriate cluster client factory (especially in the yarn mode).
>>
>> Thanks  

Re: How to create schema for flexible json data in Flink SQL

2020-05-28 Thread Benchao Li
Hi Guodong,

Does the RAW type meet your requirements? For example, you can specify
map type, and the value for the map is the raw JsonNode
parsed from Jackson.
This is not supported yet, however IMO this could be supported.

Guodong Wang  于2020年5月28日周四 下午9:43写道:

> Benchao,
>
> Thank you for your quick reply.
>
> As you mentioned, for current scenario, approach 2 should work for me. But
> it is a little bit annoying that I have to modify schema to add new field
> types when upstream app changes the json format or adds new fields.
> Otherwise, my user can not refer the field in their SQL.
>
> Per description in the jira, I think after implementing this, all the json
> values will be converted as strings.
> I am wondering if Flink SQL can/will support the flexible schema in the
> future, for example, register the table without defining specific schema
> for each field, to let user define a generic map or array for one field.
> but the value of map/array can be any object. Then, the type conversion
> cost might be saved.
>
> Guodong
>
>
> On Thu, May 28, 2020 at 7:43 PM Benchao Li  wrote:
>
>> Hi Guodong,
>>
>> I think you almost get the answer,
>> 1. map type, it's not working for current implementation. For example,
>> use map, if the value if non-string json object, then
>> `JsonNode.asText()` may not work as you wish.
>> 2. list all fields you cares. IMO, this can fit your scenario. And you
>> can set format.fail-on-missing-field = true, to allow setting non-existed
>> fields to be null.
>>
>> For 1, I think maybe we can support it in the future, and I've created
>> jira[1] to track this.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-18002
>>
>> Guodong Wang  于2020年5月28日周四 下午6:32写道:
>>
>>> Hi !
>>>
>>> I want to use Flink SQL to process some json events. It is quite
>>> challenging to define a schema for the Flink SQL table.
>>>
>>> My data source's format is some json like this
>>> {
>>> "top_level_key1": "some value",
>>> "nested_object": {
>>> "nested_key1": "abc",
>>> "nested_key2": 123,
>>> "nested_key3": ["element1", "element2", "element3"]
>>> }
>>> }
>>>
>>> The big challenges for me to define a schema for the data source are
>>> 1. the keys in nested_object are flexible, there might be 3 unique keys
>>> or more unique keys. If I enumerate all the keys in the schema, I think my
>>> code is fragile, how to handle event which contains more  nested_keys in
>>> nested_object ?
>>> 2. I know table api support Map type, but I am not sure if I can put
>>> generic object as the value of the map. Because the values in nested_object
>>> are of different types, some of them are int, some of them are string or
>>> array.
>>>
>>> So. how to expose this kind of json data as table in Flink SQL without
>>> enumerating all the nested_keys?
>>>
>>> Thanks.
>>>
>>> Guodong
>>>
>>
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>

-- 

Best,
Benchao Li


RE: History Server Not Showing Any Jobs - File Not Found?

2020-05-28 Thread Hailu, Andreas
Just created a dump, here's what I see:

"Flink-HistoryServer-ArchiveFetcher-thread-1" #19 daemon prio=5 os_prio=0 
tid=0x7f93a5a2c000 nid=0x5692 runnable [0x7f934a0d3000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0005df986960> (a sun.nio.ch.Util$2)
- locked <0x0005df986948> (a java.util.Collections$UnmodifiableSet)
- locked <0x0005df928390> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at 
org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:335)
at 
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157)
at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.readChannelFully(PacketReceiver.java:258)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:209)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:171)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)
at 
org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:201)
at 
org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:152)
- locked <0x0005ceade5e0> (a 
org.apache.hadoop.hdfs.RemoteBlockReader2)
at 
org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:781)
at 
org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:837)
- eliminated <0x0005cead3688> (a 
org.apache.hadoop.hdfs.DFSInputStream)
at 
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:897)
- locked <0x0005cead3688> (a org.apache.hadoop.hdfs.DFSInputStream)
   at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:945)
- locked <0x0005cead3688> (a org.apache.hadoop.hdfs.DFSInputStream)
at java.io.DataInputStream.read(DataInputStream.java:149)
at 
org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
at java.io.InputStream.read(InputStream.java:101)
at org.apache.flink.util.IOUtils.copyBytes(IOUtils.java:69)
at org.apache.flink.util.IOUtils.copyBytes(IOUtils.java:91)
at 
org.apache.flink.runtime.history.FsJobArchivist.getArchivedJsons(FsJobArchivist.java:110)
at 
org.apache.flink.runtime.webmonitor.history.HistoryServerArchiveFetcher$JobArchiveFetcherTask.run(HistoryServerArchiveFetcher.java:169)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

What problems could the flink-shaded-hadoop jar being included introduce?

// ah

From: Chesnay Schepler 
Sent: Thursday, May 28, 2020 9:26 AM
To: Hailu, Andreas [Engineering] ; 
user@flink.apache.org
Subject: Re: History Server Not Showing Any Jobs - File Not Found?

If it were a class-loading issue I would think that we'd see an exception of 
some kind. Maybe double-check that flink-shaded-hadoop is not in the lib 
directory. (usually I would ask for the full classpath that the HS is started 
with, but as it turns out this isn't getting logged :( (FLINK-18008))

The fact that overview.json and jobs/overview.json are missing indicates that 
something goes wrong directly on startup. What is supposed to happens is that 
the HS starts, fetches all currently available archives and then creates these 
files.
So it seems like the download gets stuck for some reason.

Can you use jstack to create a thread dump, and see what the 
Flink-HistoryServer-ArchiveFetcher is doing?

I will also file a JIRA for adding more logging statements, like when fetching 
starts/stops.

On 27/05/2020 20:57, Hailu, Andreas wrote:
Hi Chesney, apologies for not getting back to you sooner here. So I did what 
you suggested - I downloaded a few files from my jobmanager.archive.fs.dir HDFS 
directory to a locally available directory named 

Re: Apache Flink - Question about application restart

2020-05-28 Thread Till Rohrmann
Hi,

Yarn won't resubmit the job. In case of a process failure where Yarn
restarts the Flink Master, the Master will recover the submitted jobs from
a persistent storage system.

Cheers,
Till

On Thu, May 28, 2020 at 4:05 PM M Singh  wrote:

> Hi Till/Zhu/Yang:  Thanks for your replies.
>
> So just to clarify - the job id remains same if the job restarts have not
> been exhausted.  Does Yarn also resubmit the job in case of failures and if
> so, then is the job id different.
>
> Thanks
> On Wednesday, May 27, 2020, 10:05:40 AM EDT, Till Rohrmann <
> trohrm...@apache.org> wrote:
>
>
> Hi,
>
> if you submit the same job multiple times, then it will get every time a
> different JobID assigned. For Flink, different job submissions are
> considered to be different jobs. Once a job has been submitted, it will
> keep the same JobID which is important in order to retrieve the checkpoints
> associated with this job.
>
> Cheers,
> Till
>
> On Tue, May 26, 2020 at 12:42 PM M Singh  wrote:
>
> Hi Zhu Zhu:
>
> I have another clafication - it looks like if I run the same app multiple
> times - it's job id changes.  So it looks like even though the graph is the
> same the job id is not dependent on the job graph only since with different
> runs of the same app it is not the same.
>
> Please let me know if I've missed anything.
>
> Thanks
>
> On Monday, May 25, 2020, 05:32:39 PM EDT, M Singh 
> wrote:
>
>
> Hi Zhu Zhu:
>
> Just to clarify - from what I understand, EMR also has by default restart
> times (I think it is 3). So if the EMR restarts the job - the job id is the
> same since the job graph is the same.
>
> Thanks for the clarification.
>
> On Monday, May 25, 2020, 04:01:17 AM EDT, Yang Wang 
> wrote:
>
>
> Just share some additional information.
>
> When deploying Flink application on Yarn and it exhausted restart policy,
> then
> the whole application will failed. If you start another instance(Yarn
> application),
> even the high availability is configured, we could not recover from the
> latest
> checkpoint because the clusterId(i.e. applicationId) has changed.
>
>
> Best,
> Yang
>
> Zhu Zhu  于2020年5月25日周一 上午11:17写道:
>
> Hi M,
>
> Regarding your questions:
> 1. yes. The id is fixed once the job graph is generated.
> 2. yes
>
> Regarding yarn mode:
> 1. the job id keeps the same because the job graph will be generated once
> at client side and persist in DFS for reuse
> 2. yes if high availability is enabled
>
> Thanks,
> Zhu Zhu
>
> M Singh  于2020年5月23日周六 上午4:06写道:
>
> Hi Flink Folks:
>
> If I have a Flink Application with 10 restarts, if it fails and restarts,
> then:
>
> 1. Does the job have the same id ?
> 2. Does the automatically restarting application, pickup from the last
> checkpoint ? I am assuming it does but just want to confirm.
>
> Also, if it is running on AWS EMR I believe EMR/Yarn is configured to
> restart the job 3 times (after it has exhausted it's restart policy) .  If
> that is the case:
> 1. Does the job get a new id ? I believe it does, but just want to confirm.
> 2. Does the Yarn restart honor the last checkpoint ?  I believe, it does
> not, but is there a way to make it restart from the last checkpoint of the
> failed job (after it has exhausted its restart policy) ?
>
> Thanks
>
>
>


RE: History Server Not Showing Any Jobs - File Not Found?

2020-05-28 Thread Hailu, Andreas
Okay, I will look further to see if we're mistakenly using a version that's 
pre-2.6.0. However, I don't see flink-shaded-hadoop in my /lib directory for 
flink-1.9.1.

flink-dist_2.11-1.9.1.jar
flink-table-blink_2.11-1.9.1.jar
flink-table_2.11-1.9.1.jar
log4j-1.2.17.jar
slf4j-log4j12-1.7.15.jar

Are the files within /lib.

// ah

From: Chesnay Schepler 
Sent: Thursday, May 28, 2020 11:00 AM
To: Hailu, Andreas [Engineering] ; 
user@flink.apache.org
Subject: Re: History Server Not Showing Any Jobs - File Not Found?

Looks like it is indeed stuck on downloading the archive.

I searched a bit in the Hadoop JIRA and found several similar instances:
https://issues.apache.org/jira/browse/HDFS-6999
https://issues.apache.org/jira/browse/HDFS-7005
https://issues.apache.org/jira/browse/HDFS-7145

It is supposed to be fixed in 2.6.0 though :/

If hadoop is available from the HADOOP_CLASSPATH and flink-shaded-hadoop in 
/lib then you basically don't know what Hadoop version is actually being used,
which could lead to incompatibilities and dependency clashes.
If flink-shaded-hadoop 2.4/2.5 is on the classpath, maybe that is being used 
and runs into HDFS-7005.

On 28/05/2020 16:27, Hailu, Andreas wrote:
Just created a dump, here's what I see:

"Flink-HistoryServer-ArchiveFetcher-thread-1" #19 daemon prio=5 os_prio=0 
tid=0x7f93a5a2c000 nid=0x5692 runnable [0x7f934a0d3000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0005df986960> (a sun.nio.ch.Util$2)
- locked <0x0005df986948> (a java.util.Collections$UnmodifiableSet)
- locked <0x0005df928390> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at 
org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:335)
at 
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157)
at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.readChannelFully(PacketReceiver.java:258)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:209)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:171)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)
at 
org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:201)
at 
org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:152)
- locked <0x0005ceade5e0> (a 
org.apache.hadoop.hdfs.RemoteBlockReader2)
at 
org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:781)
at 
org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:837)
- eliminated <0x0005cead3688> (a 
org.apache.hadoop.hdfs.DFSInputStream)
at 
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:897)
- locked <0x0005cead3688> (a org.apache.hadoop.hdfs.DFSInputStream)
   at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:945)
- locked <0x0005cead3688> (a org.apache.hadoop.hdfs.DFSInputStream)
at java.io.DataInputStream.read(DataInputStream.java:149)
at 
org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
at java.io.InputStream.read(InputStream.java:101)
at org.apache.flink.util.IOUtils.copyBytes(IOUtils.java:69)
at org.apache.flink.util.IOUtils.copyBytes(IOUtils.java:91)
at 
org.apache.flink.runtime.history.FsJobArchivist.getArchivedJsons(FsJobArchivist.java:110)
at 

Re: [DISCUSS] FLINK-17989 - java.lang.NoClassDefFoundError org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter

2020-05-28 Thread Israel Ekpo
Guowei,

What do we need to do to add support for it?

How do I get started on that?



On Wed, May 27, 2020 at 8:53 PM Guowei Ma  wrote:

> Hi,
> I think the StreamingFileSink could not support Azure currently.
> You could find more detailed info from here[1].
>
> [1] https://issues.apache.org/jira/browse/FLINK-17444
> Best,
> Guowei
>
>
> Israel Ekpo  于2020年5月28日周四 上午6:04写道:
>
>> You can assign the task to me and I will like to collaborate with someone
>> to fix it.
>>
>> On Wed, May 27, 2020 at 5:52 PM Israel Ekpo  wrote:
>>
>>> Some users are running into issues when using Azure Blob Storage for the
>>> StreamFileSink
>>>
>>> https://issues.apache.org/jira/browse/FLINK-17989
>>>
>>> The issue is because certain packages are relocated in the POM file and
>>> some classes are dropped in the final shaded jar
>>>
>>> I have attempted to comment out the relocated and recompile the source
>>> but I keep hitting roadblocks of other relocation and filtration each time
>>> I update a specific pom file
>>>
>>> How can this be addressed so that these users can be unblocked? Why are
>>> the classes filtered out? What is the workaround? I can work on the patch
>>> if I have some guidance.
>>>
>>> This is an issue in Flink 1.9 and 1.10 and I believe 1.11 has the same
>>> issue but I am yet to confirm
>>>
>>> Thanks.
>>>
>>>
>>>
>>


Streaming multiple csv files

2020-05-28 Thread Nikola Hrusov
Hello,

I have multiple files (file1, file2, file3) each being CSV and having
different columns and data. The column headers are finite and we know
their format. I would like to take them and parse them based on the column
structure. I already have the parsers

e.g.:

file1 has columns (id, firstname, lastname)
file2 has columns (id, name)
file3 has columns (id, name_1, name_2, name_3, name_4)

I would like to take all those files, read them, parse them and output
objects to a sink as Person { id, fullName }

Example files would be:

file1:
--
id, firstname, lastname
33, John, Smith
55, Labe, Soni

file2:
--
id, name
5, Mitr Kompi
99, Squi Masw

file3:
--
id, name_1, name_2, name_3, name_4
1, Peter, Hov, Risti, Pena
2, Rii, Koni, Ques,,

Expected output of my program would be:

Person { 33, John Smith }
Person { 55, Labe Soni }
Person { 5, Mitr Kompi }
Person { 99, Squi Masw }
Person { 1, Peter Hov Risti Pena }
Person { 2, Rii Koni Ques }



What I do now is:

My code (very simplified) is: env.readFile().flatMap(new
MyParser()).addSink(new MySink())
The MyParser receives the rows 1 by 1 in string format. Which means that
when I run with parallelism > 1 I receive data from any file and I cannot
say this line comes from where.



What I would like to do is:

Be able to figure out which is the file I am reading from.
Since I only know the file type based on the first row (columns) I need to
either send the 1st row to MyParser() or send a tuple <1st row of file
being read, current row of file being read>.
Another option that I can think about is to have some keyed function based
on the first row, but I am not sure how to achieve that by using readFile.


Is there a way I can achieve this?


Regards
,
Nikola


Re: [DISCUSS] FLINK-17989 - java.lang.NoClassDefFoundError org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter

2020-05-28 Thread Till Rohrmann
Hi Israel,

thanks for reaching out to the Flink community. As Guowei said, the
StreamingFileSink can currently only recover from faults if it writes to
HDFS or S3. Other file systems are currently not supported if you need
fault tolerance.

Maybe Klou can tell you more about the background and what is needed to
make it work with other file systems. He is one of the original authors of
the StreamingFileSink.

Cheers,
Till

On Thu, May 28, 2020 at 4:39 PM Israel Ekpo  wrote:

> Guowei,
>
> What do we need to do to add support for it?
>
> How do I get started on that?
>
>
>
> On Wed, May 27, 2020 at 8:53 PM Guowei Ma  wrote:
>
>> Hi,
>> I think the StreamingFileSink could not support Azure currently.
>> You could find more detailed info from here[1].
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-17444
>> Best,
>> Guowei
>>
>>
>> Israel Ekpo  于2020年5月28日周四 上午6:04写道:
>>
>>> You can assign the task to me and I will like to collaborate with
>>> someone to fix it.
>>>
>>> On Wed, May 27, 2020 at 5:52 PM Israel Ekpo 
>>> wrote:
>>>
 Some users are running into issues when using Azure Blob Storage for
 the StreamFileSink

 https://issues.apache.org/jira/browse/FLINK-17989

 The issue is because certain packages are relocated in the POM file and
 some classes are dropped in the final shaded jar

 I have attempted to comment out the relocated and recompile the source
 but I keep hitting roadblocks of other relocation and filtration each time
 I update a specific pom file

 How can this be addressed so that these users can be unblocked? Why are
 the classes filtered out? What is the workaround? I can work on the patch
 if I have some guidance.

 This is an issue in Flink 1.9 and 1.10 and I believe 1.11 has the same
 issue but I am yet to confirm

 Thanks.



>>>


Re: How to create schema for flexible json data in Flink SQL

2020-05-28 Thread Guodong Wang
Benchao,

Thank you for your quick reply.

As you mentioned, for current scenario, approach 2 should work for me. But
it is a little bit annoying that I have to modify schema to add new field
types when upstream app changes the json format or adds new fields.
Otherwise, my user can not refer the field in their SQL.

Per description in the jira, I think after implementing this, all the json
values will be converted as strings.
I am wondering if Flink SQL can/will support the flexible schema in the
future, for example, register the table without defining specific schema
for each field, to let user define a generic map or array for one field.
but the value of map/array can be any object. Then, the type conversion
cost might be saved.

Guodong


On Thu, May 28, 2020 at 7:43 PM Benchao Li  wrote:

> Hi Guodong,
>
> I think you almost get the answer,
> 1. map type, it's not working for current implementation. For example, use
> map, if the value if non-string json object, then
> `JsonNode.asText()` may not work as you wish.
> 2. list all fields you cares. IMO, this can fit your scenario. And you can
> set format.fail-on-missing-field = true, to allow setting non-existed
> fields to be null.
>
> For 1, I think maybe we can support it in the future, and I've created
> jira[1] to track this.
>
> [1] https://issues.apache.org/jira/browse/FLINK-18002
>
> Guodong Wang  于2020年5月28日周四 下午6:32写道:
>
>> Hi !
>>
>> I want to use Flink SQL to process some json events. It is quite
>> challenging to define a schema for the Flink SQL table.
>>
>> My data source's format is some json like this
>> {
>> "top_level_key1": "some value",
>> "nested_object": {
>> "nested_key1": "abc",
>> "nested_key2": 123,
>> "nested_key3": ["element1", "element2", "element3"]
>> }
>> }
>>
>> The big challenges for me to define a schema for the data source are
>> 1. the keys in nested_object are flexible, there might be 3 unique keys
>> or more unique keys. If I enumerate all the keys in the schema, I think my
>> code is fragile, how to handle event which contains more  nested_keys in
>> nested_object ?
>> 2. I know table api support Map type, but I am not sure if I can put
>> generic object as the value of the map. Because the values in nested_object
>> are of different types, some of them are int, some of them are string or
>> array.
>>
>> So. how to expose this kind of json data as table in Flink SQL without
>> enumerating all the nested_keys?
>>
>> Thanks.
>>
>> Guodong
>>
>
>
> --
>
> Best,
> Benchao Li
>


Re: How to create schema for flexible json data in Flink SQL

2020-05-28 Thread Leonard Xu
Hi, guodong 
 
> I am wondering if Flink SQL can/will support the flexible schema in the 
> future,

It’s an interesting topic, this feature is more close to the scope of schema 
inference.
The schema inference should come in next few releases. 

Best,
Leonard Xu




> for example, register the table without defining specific schema for each 
> field, to let user define a generic map or array for one field. but the value 
> of map/array can be any object. Then, the type conversion cost might be 
> saved. 
> 
> Guodong
> 
> 
> On Thu, May 28, 2020 at 7:43 PM Benchao Li  > wrote:
> Hi Guodong,
> 
> I think you almost get the answer,
> 1. map type, it's not working for current implementation. For example, use 
> map, if the value if non-string json object, then 
> `JsonNode.asText()` may not work as you wish.
> 2. list all fields you cares. IMO, this can fit your scenario. And you can 
> set format.fail-on-missing-field = true, to allow setting non-existed fields 
> to be null.
> 
> For 1, I think maybe we can support it in the future, and I've created 
> jira[1] to track this.
> 
> [1] https://issues.apache.org/jira/browse/FLINK-18002 
> 
> Guodong Wang mailto:wangg...@gmail.com>> 于2020年5月28日周四 
> 下午6:32写道:
> Hi !
> 
> I want to use Flink SQL to process some json events. It is quite challenging 
> to define a schema for the Flink SQL table. 
> 
> My data source's format is some json like this
> {
> "top_level_key1": "some value",
> "nested_object": {
> "nested_key1": "abc",
> "nested_key2": 123,
> "nested_key3": ["element1", "element2", "element3"]
> }
> }
> 
> The big challenges for me to define a schema for the data source are
> 1. the keys in nested_object are flexible, there might be 3 unique keys or 
> more unique keys. If I enumerate all the keys in the schema, I think my code 
> is fragile, how to handle event which contains more  nested_keys in 
> nested_object ?
> 2. I know table api support Map type, but I am not sure if I can put generic 
> object as the value of the map. Because the values in nested_object are of 
> different types, some of them are int, some of them are string or array.
> 
> So. how to expose this kind of json data as table in Flink SQL without 
> enumerating all the nested_keys?
> 
> Thanks.
> 
> Guodong
> 
> 
> -- 
> 
> Best,
> Benchao Li



Flink Iterator Functions

2020-05-28 Thread Roderick Vincent
Hi,

I am brand new to Apache Flink so please excuse any silly questions.  I
have an Iterator function defined as below and adding it as a source to a
Flink stream.  But when I try to pass configuration information to it (via
a Spring env), what I notice is that one of the threads calls hasNext() and
it is not the same object and the passed information is null.  Something is
constructing it, but what is strange is that if I add a default constructor
I do not see this being called by this thread with the null data so I am
wondering what is going on.  Any ideas?  How do we pass configuration
information to these functions?  Any help would be appreciated.

Thanks,
Rick

@Public
public class NodeSource extends
FromIteratorFunction> {


private static final long serialVersionUID = 1L;

public NodeSource(ArangoDBSource iterator) {
super(iterator);
}

}


Re: Tumbling windows - increasing checkpoint size over time

2020-05-28 Thread Till Rohrmann
Hi Matt,

when using tumbling windows, then the checkpoint size is not only dependent
on the number of keys (which is equivalent to the number of open windows)
but also on how many events arrive for each open window because the windows
store every window event in its state. Hence, it can be the case that you
see different checkpoint sizes depending on the actual data distribution
which can change over time. Have you checked whether the data distribution
and rate is constant over time?

What is the expected number of keys, size of events and number of events
per key per second? Based on this information one could try to estimate an
upper state size bound.

Cheers,
Till

On Wed, May 27, 2020 at 8:19 PM Wissman, Matt  wrote:

> Hello Till & Guowei,
>
>
>
> Thanks for the replies! Here is a snippet of the window function:
>
>
>
>   SingleOutputStreamOperator aggregatedStream = dataStream
>
> .keyBy(idKeySelector())
>
> .window(TumblingProcessingTimeWindows.of(seconds(15)))
>
> .apply(new Aggregator())
>
> .name("Aggregator")
>
> .setParallelism(3);
>
>
>
> Checkpoint interval: 2 secs when the checkpoint size grew from 100KB to
> 100MB (we’ve since changed the 5 minutes, which has slowed the checkpoint
> size growth)
>
> Lateness allowed: 0
>
> Watermarks: nothing is set in terms of watermarks – do they apply for
> Process Time?
>
> The set of keys processed in the stream is stable over time
>
>
>
> The checkpoint size actually looks pretty stable now that the interval was
> increased. Is it possible that the short checkpoint interval prevented
> compaction?
>
>
>
> Thanks!
>
>
>
> -Matt
>
>
>
>
>
> *From: *Till Rohrmann 
> *Date: *Wednesday, May 27, 2020 at 9:00 AM
> *To: *Guowei Ma 
> *Cc: *"Wissman, Matt" , "user@flink.apache.org" <
> user@flink.apache.org>
> *Subject: *Re: Tumbling windows - increasing checkpoint size over time
>
>
>
> *LEARN FAST: This email originated outside of HERE.*
> Please do not click on links or open attachments unless you recognize the
> sender and know the content is safe. Thank you.
>
>
>
> Hi Matt,
>
>
>
> could you give us a bit more information about the windows you are using?
> They are tumbling windows. What's the size of the windows? Do you allow
> lateness of events? What's your checkpoint interval?
>
>
>
> Are you using event time? If yes, how is the watermark generated?
>
>
>
> You said that the number of events per window is more or less constant.
> Does this is also apply to the size of the individual events?
>
>
>
> Cheers,
>
> Till
>
>
>
> On Wed, May 27, 2020 at 1:21 AM Guowei Ma  wrote:
>
> Hi, Matt
> The total size of the state of the window operator is related to the
> number of windows. For example if you use keyby+tumblingwindow there
> would be keys number of windows.
> Hope this helps.
> Best,
> Guowei
>
> Wissman, Matt  于2020年5月27日周三 上午3:35写道:
> >
> > Hello Flink Community,
> >
> >
> >
> > I’m running a Flink pipeline that uses a tumbling window and incremental
> checkpoint with RocksDB backed by s3. The number of objects in the window
> is stable but overtime the checkpoint size grows seemingly unbounded.
> Within the first few hours after bringing the Flink pipeline up, the
> checkpoint size is around 100K but after a week of operation it grows to
> around 100MB. The pipeline isn’t using any other Flink state besides the
> state that the window uses. I think this has something to do with RocksDB’s
> compaction but shouldn’t the tumbling window state expire and be purged
> from the checkpoint?
> >
> >
> >
> > Flink Version 1.7.1
> >
> >
> >
> > Thanks!
> >
> >
> >
> > -Matt
>
>


How do I make sure to place operator instances in specific Task Managers?

2020-05-28 Thread Felipe Gutierrez
For instance, if I have the following DAG with the respect parallelism
in parenthesis (I hope the dag appears real afterall):

  source01 -> map01(4) -> flatmap01(4) \

  |-> keyBy -> reducer(8)
  source02 -> map02(4) -> flatmap02(4) /

And I have 4 TMs in 4 machines with 4 cores each. I would like to
place source01 and map01 and flatmap01 in TM-01. source02 and map02
and flatmap02 in TM-02. I am using "disableChaning()" in the faltMap
operator to measure it. And reducer1-to-4 in TM-03 and reducer5-to-8
in TM-04.

I am using the methods "setParallelism()" and "slotSharingGroup()" to
define it but both source01 and source02 are placed in TM-01 and map01
is split into 2 TMs. The same with map02.

Thanks,
Felipe
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com


Re: [DISCUSS] FLINK-17989 - java.lang.NoClassDefFoundError org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter

2020-05-28 Thread Israel Ekpo
Hi Till,

Thanks for your feedback and guidance.

It seems similar work was done for S3 filesystem where relocations were
removed for those file system plugins.

https://issues.apache.org/jira/browse/FLINK-11956

It appears the same needs to be done for Azure File systems.

I will attempt to connect with Klou today to collaborate to see what the
level of effort is to add this support.

Thanks.



On Thu, May 28, 2020 at 11:54 AM Till Rohrmann  wrote:

> Hi Israel,
>
> thanks for reaching out to the Flink community. As Guowei said, the
> StreamingFileSink can currently only recover from faults if it writes to
> HDFS or S3. Other file systems are currently not supported if you need
> fault tolerance.
>
> Maybe Klou can tell you more about the background and what is needed to
> make it work with other file systems. He is one of the original authors of
> the StreamingFileSink.
>
> Cheers,
> Till
>
> On Thu, May 28, 2020 at 4:39 PM Israel Ekpo  wrote:
>
>> Guowei,
>>
>> What do we need to do to add support for it?
>>
>> How do I get started on that?
>>
>>
>>
>> On Wed, May 27, 2020 at 8:53 PM Guowei Ma  wrote:
>>
>>> Hi,
>>> I think the StreamingFileSink could not support Azure currently.
>>> You could find more detailed info from here[1].
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-17444
>>> Best,
>>> Guowei
>>>
>>>
>>> Israel Ekpo  于2020年5月28日周四 上午6:04写道:
>>>
 You can assign the task to me and I will like to collaborate with
 someone to fix it.

 On Wed, May 27, 2020 at 5:52 PM Israel Ekpo 
 wrote:

> Some users are running into issues when using Azure Blob Storage for
> the StreamFileSink
>
> https://issues.apache.org/jira/browse/FLINK-17989
>
> The issue is because certain packages are relocated in the POM file
> and some classes are dropped in the final shaded jar
>
> I have attempted to comment out the relocated and recompile the source
> but I keep hitting roadblocks of other relocation and filtration each time
> I update a specific pom file
>
> How can this be addressed so that these users can be unblocked? Why
> are the classes filtered out? What is the workaround? I can work on the
> patch if I have some guidance.
>
> This is an issue in Flink 1.9 and 1.10 and I believe 1.11 has the same
> issue but I am yet to confirm
>
> Thanks.
>
>
>



Re: [DISCUSS] FLINK-17989 - java.lang.NoClassDefFoundError org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter

2020-05-28 Thread Till Rohrmann
I think what needs to be done is to implement
a org.apache.flink.core.fs.RecoverableWriter for the respective file
system. Similar to HadoopRecoverableWriter and S3RecoverableWriter.

Cheers,
Till

On Thu, May 28, 2020 at 6:00 PM Israel Ekpo  wrote:

> Hi Till,
>
> Thanks for your feedback and guidance.
>
> It seems similar work was done for S3 filesystem where relocations were
> removed for those file system plugins.
>
> https://issues.apache.org/jira/browse/FLINK-11956
>
> It appears the same needs to be done for Azure File systems.
>
> I will attempt to connect with Klou today to collaborate to see what the
> level of effort is to add this support.
>
> Thanks.
>
>
>
> On Thu, May 28, 2020 at 11:54 AM Till Rohrmann 
> wrote:
>
>> Hi Israel,
>>
>> thanks for reaching out to the Flink community. As Guowei said, the
>> StreamingFileSink can currently only recover from faults if it writes to
>> HDFS or S3. Other file systems are currently not supported if you need
>> fault tolerance.
>>
>> Maybe Klou can tell you more about the background and what is needed to
>> make it work with other file systems. He is one of the original authors of
>> the StreamingFileSink.
>>
>> Cheers,
>> Till
>>
>> On Thu, May 28, 2020 at 4:39 PM Israel Ekpo  wrote:
>>
>>> Guowei,
>>>
>>> What do we need to do to add support for it?
>>>
>>> How do I get started on that?
>>>
>>>
>>>
>>> On Wed, May 27, 2020 at 8:53 PM Guowei Ma  wrote:
>>>
 Hi,
 I think the StreamingFileSink could not support Azure currently.
 You could find more detailed info from here[1].

 [1] https://issues.apache.org/jira/browse/FLINK-17444
 Best,
 Guowei


 Israel Ekpo  于2020年5月28日周四 上午6:04写道:

> You can assign the task to me and I will like to collaborate with
> someone to fix it.
>
> On Wed, May 27, 2020 at 5:52 PM Israel Ekpo 
> wrote:
>
>> Some users are running into issues when using Azure Blob Storage for
>> the StreamFileSink
>>
>> https://issues.apache.org/jira/browse/FLINK-17989
>>
>> The issue is because certain packages are relocated in the POM file
>> and some classes are dropped in the final shaded jar
>>
>> I have attempted to comment out the relocated and recompile the
>> source but I keep hitting roadblocks of other relocation and filtration
>> each time I update a specific pom file
>>
>> How can this be addressed so that these users can be unblocked? Why
>> are the classes filtered out? What is the workaround? I can work on the
>> patch if I have some guidance.
>>
>> This is an issue in Flink 1.9 and 1.10 and I believe 1.11 has the
>> same issue but I am yet to confirm
>>
>> Thanks.
>>
>>
>>
>


Re: Apache Flink - Question about application restart

2020-05-28 Thread M Singh
 Hi Till/Zhu/Yang:  Thanks for your replies.
So just to clarify - the job id remains same if the job restarts have not been 
exhausted.  Does Yarn also resubmit the job in case of failures and if so, then 
is the job id different.
ThanksOn Wednesday, May 27, 2020, 10:05:40 AM EDT, Till Rohrmann 
 wrote:  
 
 Hi,
if you submit the same job multiple times, then it will get every time a 
different JobID assigned. For Flink, different job submissions are considered 
to be different jobs. Once a job has been submitted, it will keep the same 
JobID which is important in order to retrieve the checkpoints associated with 
this job.
Cheers,Till
On Tue, May 26, 2020 at 12:42 PM M Singh  wrote:

 Hi Zhu Zhu:
I have another clafication - it looks like if I run the same app multiple times 
- it's job id changes.  So it looks like even though the graph is the same the 
job id is not dependent on the job graph only since with different runs of the 
same app it is not the same.
Please let me know if I've missed anything.
Thanks
On Monday, May 25, 2020, 05:32:39 PM EDT, M Singh  
wrote:  
 
  Hi Zhu Zhu:
Just to clarify - from what I understand, EMR also has by default restart times 
(I think it is 3). So if the EMR restarts the job - the job id is the same 
since the job graph is the same. 
Thanks for the clarification.
On Monday, May 25, 2020, 04:01:17 AM EDT, Yang Wang  
wrote:  
 
 Just share some additional information.
When deploying Flink application on Yarn and it exhausted restart policy, 
thenthe whole application will failed. If you start another instance(Yarn 
application),even the high availability is configured, we could not recover 
from the latestcheckpoint because the clusterId(i.e. applicationId) has changed.

Best,Yang
Zhu Zhu  于2020年5月25日周一 上午11:17写道:

Hi M,
Regarding your questions:1. yes. The id is fixed once the job graph is 
generated.2. yes
Regarding yarn mode:1. the job id keeps the same because the job graph will be 
generated once at client side and persist in DFS for reuse2. yes if high 
availability is enabled

Thanks,Zhu Zhu
M Singh  于2020年5月23日周六 上午4:06写道:

Hi Flink Folks:
If I have a Flink Application with 10 restarts, if it fails and restarts, then:
1. Does the job have the same id ?2. Does the automatically restarting 
application, pickup from the last checkpoint ? I am assuming it does but just 
want to confirm.
Also, if it is running on AWS EMR I believe EMR/Yarn is configured to restart 
the job 3 times (after it has exhausted it's restart policy) .  If that is the 
case:1. Does the job get a new id ? I believe it does, but just want to 
confirm.2. Does the Yarn restart honor the last checkpoint ?  I believe, it 
does not, but is there a way to make it restart from the last checkpoint of the 
failed job (after it has exhausted its restart policy) ?
Thanks




  

Re: How to create schema for flexible json data in Flink SQL

2020-05-28 Thread Guodong Wang
Yes. Setting the value type as raw is one possible approach. And I would
like to vote for schema inference as well.

Correct me if I am wrong, IMO schema inference means I can provide a method
in the table source to infer the data schema base on the runtime
computation. Just like some calcite adaptor does. Right?
For SQL table registration, I think that requiring the table source to
provide a static schema might be too strict. Let planner to infer the table
schema will be more flexible.

Thank you for your suggestions.

Guodong


On Thu, May 28, 2020 at 11:11 PM Benchao Li  wrote:

> Hi Guodong,
>
> Does the RAW type meet your requirements? For example, you can specify
> map type, and the value for the map is the raw JsonNode
> parsed from Jackson.
> This is not supported yet, however IMO this could be supported.
>
> Guodong Wang  于2020年5月28日周四 下午9:43写道:
>
>> Benchao,
>>
>> Thank you for your quick reply.
>>
>> As you mentioned, for current scenario, approach 2 should work for me.
>> But it is a little bit annoying that I have to modify schema to add new
>> field types when upstream app changes the json format or adds new fields.
>> Otherwise, my user can not refer the field in their SQL.
>>
>> Per description in the jira, I think after implementing this, all the
>> json values will be converted as strings.
>> I am wondering if Flink SQL can/will support the flexible schema in the
>> future, for example, register the table without defining specific schema
>> for each field, to let user define a generic map or array for one field.
>> but the value of map/array can be any object. Then, the type conversion
>> cost might be saved.
>>
>> Guodong
>>
>>
>> On Thu, May 28, 2020 at 7:43 PM Benchao Li  wrote:
>>
>>> Hi Guodong,
>>>
>>> I think you almost get the answer,
>>> 1. map type, it's not working for current implementation. For example,
>>> use map, if the value if non-string json object, then
>>> `JsonNode.asText()` may not work as you wish.
>>> 2. list all fields you cares. IMO, this can fit your scenario. And you
>>> can set format.fail-on-missing-field = true, to allow setting non-existed
>>> fields to be null.
>>>
>>> For 1, I think maybe we can support it in the future, and I've created
>>> jira[1] to track this.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-18002
>>>
>>> Guodong Wang  于2020年5月28日周四 下午6:32写道:
>>>
 Hi !

 I want to use Flink SQL to process some json events. It is quite
 challenging to define a schema for the Flink SQL table.

 My data source's format is some json like this
 {
 "top_level_key1": "some value",
 "nested_object": {
 "nested_key1": "abc",
 "nested_key2": 123,
 "nested_key3": ["element1", "element2", "element3"]
 }
 }

 The big challenges for me to define a schema for the data source are
 1. the keys in nested_object are flexible, there might be 3 unique keys
 or more unique keys. If I enumerate all the keys in the schema, I think my
 code is fragile, how to handle event which contains more  nested_keys in
 nested_object ?
 2. I know table api support Map type, but I am not sure if I can put
 generic object as the value of the map. Because the values in nested_object
 are of different types, some of them are int, some of them are string or
 array.

 So. how to expose this kind of json data as table in Flink SQL without
 enumerating all the nested_keys?

 Thanks.

 Guodong

>>>
>>>
>>> --
>>>
>>> Best,
>>> Benchao Li
>>>
>>
>
> --
>
> Best,
> Benchao Li
>


Best way to "emulate" a rich Partitioner with open() and close() methods ?

2020-05-28 Thread LINZ, Arnaud
Hello,



I would like to upgrade the performance of my Apache Kudu Sink by using the new 
“KuduPartitioner” of Kudu API to match Flink stream partitions with Kudu 
partitions to lower the network shuffling.

For that, I would like to implement something like

stream.partitionCustom(new KuduFlinkPartitioner<>(…)).addSink(new KuduSink(…)));

With KuduFLinkPartitioner a implementation of 
org.apache.flink.api.common.functions.Partitioner that internally make use of 
the KuduPartitioner client tool of Kudu’s API.



However for that KuduPartioner to work, it needs to open – and close at the end 
– a connection to the Kudu table – obviously something that can’t be done for 
each line. But there is no “AbstractRichPartitioner” with open() and close() 
method that I can use for that (the way I use it in the sink for instance).



What is the best way to implement this ?

I thought of ThreadLocals that would be initialized during the first call to 
int partition(K key, int numPartitions);  but I won’t be able to close() things 
nicely as I won’t be notified on job termination.



I thought of putting those static ThreadLocals inside a “Identity Mapper” that 
would be called just prior the partition with something like :

stream.map(richIdentiyConnectionManagerMapper).partitionCustom(new 
KuduFlinkPartitioner<>(…)).addSink(new KuduSink(…)));

with kudu connections initialized in the mapper open(), closed in the mapper 
close(), and used  in the partitioner partition().

However It looks like an ugly hack breaking every coding principle, but as long 
as the threads are reused between the mapper and the partitioner I think that 
it should work.



Is there a better way to do this ?



Best regards,

Arnaud







L'intégrité de ce message n'étant pas assurée sur internet, la société 
expéditrice ne peut être tenue responsable de son contenu ni de ses pièces 
jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous 
n'êtes pas destinataire de ce message, merci de le détruire et d'avertir 
l'expéditeur.

The integrity of this message cannot be guaranteed on the Internet. The company 
that sent this message cannot therefore be held liable for its content nor 
attachments. Any unauthorized use or dissemination is prohibited. If you are 
not the intended recipient of this message, then please delete it and notify 
the sender.


Re: [DISCUSS] FLINK-17989 - java.lang.NoClassDefFoundError org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter

2020-05-28 Thread Israel Ekpo
Thanks Till.

I will take a look at that tomorrow and let you know if I hit any
roadblocks.

On Thu, May 28, 2020 at 12:11 PM Till Rohrmann  wrote:

> I think what needs to be done is to implement
> a org.apache.flink.core.fs.RecoverableWriter for the respective file
> system. Similar to HadoopRecoverableWriter and S3RecoverableWriter.
>
> Cheers,
> Till
>
> On Thu, May 28, 2020 at 6:00 PM Israel Ekpo  wrote:
>
>> Hi Till,
>>
>> Thanks for your feedback and guidance.
>>
>> It seems similar work was done for S3 filesystem where relocations were
>> removed for those file system plugins.
>>
>> https://issues.apache.org/jira/browse/FLINK-11956
>>
>> It appears the same needs to be done for Azure File systems.
>>
>> I will attempt to connect with Klou today to collaborate to see what the
>> level of effort is to add this support.
>>
>> Thanks.
>>
>>
>>
>> On Thu, May 28, 2020 at 11:54 AM Till Rohrmann 
>> wrote:
>>
>>> Hi Israel,
>>>
>>> thanks for reaching out to the Flink community. As Guowei said, the
>>> StreamingFileSink can currently only recover from faults if it writes to
>>> HDFS or S3. Other file systems are currently not supported if you need
>>> fault tolerance.
>>>
>>> Maybe Klou can tell you more about the background and what is needed to
>>> make it work with other file systems. He is one of the original authors of
>>> the StreamingFileSink.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, May 28, 2020 at 4:39 PM Israel Ekpo 
>>> wrote:
>>>
 Guowei,

 What do we need to do to add support for it?

 How do I get started on that?



 On Wed, May 27, 2020 at 8:53 PM Guowei Ma  wrote:

> Hi,
> I think the StreamingFileSink could not support Azure currently.
> You could find more detailed info from here[1].
>
> [1] https://issues.apache.org/jira/browse/FLINK-17444
> Best,
> Guowei
>
>
> Israel Ekpo  于2020年5月28日周四 上午6:04写道:
>
>> You can assign the task to me and I will like to collaborate with
>> someone to fix it.
>>
>> On Wed, May 27, 2020 at 5:52 PM Israel Ekpo 
>> wrote:
>>
>>> Some users are running into issues when using Azure Blob Storage for
>>> the StreamFileSink
>>>
>>> https://issues.apache.org/jira/browse/FLINK-17989
>>>
>>> The issue is because certain packages are relocated in the POM file
>>> and some classes are dropped in the final shaded jar
>>>
>>> I have attempted to comment out the relocated and recompile the
>>> source but I keep hitting roadblocks of other relocation and filtration
>>> each time I update a specific pom file
>>>
>>> How can this be addressed so that these users can be unblocked? Why
>>> are the classes filtered out? What is the workaround? I can work on the
>>> patch if I have some guidance.
>>>
>>> This is an issue in Flink 1.9 and 1.10 and I believe 1.11 has the
>>> same issue but I am yet to confirm
>>>
>>> Thanks.
>>>
>>>
>>>
>>


Flink Elastic Sink

2020-05-28 Thread aj
Hello All,

I am getting many events in Kafka and I have written a link job that sinks
that Avro records from Kafka to S3 in parquet format.

Now, I want to sink these records into elastic search. but the only
challenge is that I want to sink record on time indices. Basically, In
Elastic, I want to create a per day index with the date as the suffix.
So in Flink stream job if I create an es sink how will I change the sink to
start writing  in a new index when the first event of the day arrives

Thanks,
Anuj.








Question on stream joins

2020-05-28 Thread Sudan S
Hi ,

I have two usecases

1. I have two streams which `leftSource` and `rightSource` which i want to
join without partitioning over a window and find the difference of count of
elements of leftSource and rightSource and emit the result of difference.
Which is the appropriate join function ican use ?

join/cogroup/connect.

2. I want to replicate the same behaviour over a keyed source. Basically
leftSource and rightSource are joined by a partition key.

Plz let me know which is the appropriate join operator for the usecase

-- 
*"The information contained in this e-mail and any accompanying documents 
may contain information that is confidential or otherwise protected from 
disclosure. If you are not the intended recipient of this message, or if 
this message has been addressed to you in error, please immediately alert 
the sender by replying to this e-mail and then delete this message, 
including any attachments. Any dissemination, distribution or other use of 
the contents of this message by anyone other than the intended recipient is 
strictly prohibited. All messages sent to and from this e-mail address may 
be monitored as permitted by applicable law and regulations to ensure 
compliance with our internal policies and to protect our business."*


Re: Stateful functions Harness

2020-05-28 Thread Boris Lublinsky
Also I have noticed, that a few cludstate jars including statefun-flink-core, 
statefun-flink-io, statefun-flink-harness are build for Scala 11, is it 
possible to create versions of those for Scala 12?

> On May 27, 2020, at 3:15 PM, Seth Wiesman  wrote:
> 
> Hi Boris, 
> 
> Example usage of flink sources and sink is available in the documentation[1]. 
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.0/io-module/flink-connectors.html
>  
> 
> On Wed, May 27, 2020 at 1:08 PM Boris Lublinsky 
> mailto:boris.lublin...@lightbend.com>> wrote:
> Thats not exactly the usage question that I am asking
> When I am writing IO module I have to write Ingress and Egress spec.
> You have an example for Kafka, which looks like
> 
> def getIngressSpec: IngressSpec[GreetRequest] =
>   KafkaIngressBuilder.forIdentifier(GREETING_INGRESS_ID)
> .withKafkaAddress(kafkaAddress)
> .withTopic("names")
> .withDeserializer(classOf[GreetKafkaDeserializer])
> .withProperty(ConsumerConfig.GROUP_ID_CONFIG, "greetings")
> .build
> 
> def getEgressSpec: EgressSpec[GreetResponse] =
>   KafkaEgressBuilder.forIdentifier(GREETING_EGRESS_ID)
> .withKafkaAddress(kafkaAddress)
> .withSerializer(classOf[GreetKafkaSerializer])
> .build
> How is it going to look if I am using SourceSinkModule?
> Do I just specify stream names? Something else?
> 
> 
> 
> 
> 
>> On May 27, 2020, at 11:29 AM, Tzu-Li (Gordon) Tai > > wrote:
>> 
>> 
>> 
>> On Thu, May 28, 2020, 12:19 AM Boris Lublinsky 
>> mailto:boris.lublin...@lightbend.com>> wrote:
>> I think I figured this out.
>> The project seems to be missing
>> 
>> resources 
>> /META-INF
>>  
>> /services
>>  directory, which should contain services
>> 
>> Yes, the functions / ingresses / regresses etc. are not discoverable if the 
>> service file isnt present in the classpath.
>> 
>> For the examples, if you are running it straight from the repo, should all 
>> have that service file defined and therefore readily runnable.
>> 
>> If you are creating your own application project, you'll have to add that 
>> yourself.
>> 
>> 
>> Another question:
>> I see org.apache.flink.statefun.flink.io.datastream.SourceSinkModule
>> 
>> Class, which I think allows to use existing data streams as ingress/egress.
>> 
>> Are there any examples of its usage
>> 
>> On the Harness class, there is a withFlinkSourceFunction method in which you 
>> can directly add a Flink source function as the ingress.
>> 
>> If you want to use that directly in a normal application (not just execution 
>> in IDE with the Harness), you can define your ingesses/egresses by binding 
>> SourceFunctionSpec / SinkFunctionSpec.
>> Please see how they are being used in the Harness class for examples.
>> 
>> Gordon
>> 
>> 
>> 
>>> On May 27, 2020, at 11:10 AM, Tzu-Li (Gordon) Tai >> > wrote:
>>> 
>>> Hi,
>>> 
>>> The example is working fine on my side (also using IntelliJ).
>>> This could most likely be a problem with your project setup in the IDE, 
>>> where the classpath isn't setup correctly.
>>> 
>>> What do you see when you right click on the statefun-flink-harness-example 
>>> directory (in the IDE) --> Open Module Settings, and then under the 
>>> "Sources" / "Dependencies" tab?
>>> Usually this should all be automatically setup correctly when importing the 
>>> project.
>>> 
>>> Gordon
>>> 
>>> On Wed, May 27, 2020 at 11:46 PM Boris Lublinsky 
>>> mailto:boris.lublin...@lightbend.com>> 
>>> wrote:
>>> The project 
>>> https://github.com/apache/flink-statefun/tree/release-2.0/statefun-examples/statefun-flink-harness-example
>>>  
>>> 
>>> Does not work in Intellij.
>>> 
>>> The problem is that when running in Intellij, method public static Modules 
>>> loadFromClassPath() {
>>> Does not pick up classes, which are local in Intellij
>>> 
>>> Any work arounds?
>>> 
>>> 
>>> 
>>> 
 On May 22, 2020, at 12:03 AM, Tzu-Li (Gordon) Tai >>> > wrote:
 
 Hi,
 
 Sorry, I need to correct my comment on using the Kafka ingress / egress 
 with the Harness.
 
 That is actually doable, by adding an extra dependency to 
 `statefun-flink-distribution` in your Harness program.
 That pulls in all the other required dependencies required by the Kafka 
 ingress / egress, such as the source / sink providers and Flink Kafka 
 connectors.
 
 Cheers,
 Gordon
 
 On Fri, May 22, 2020 at 12:04 PM Tzu-Li (Gordon) Tai >>> 

Re: Apache Flink - Question about application restart

2020-05-28 Thread M Singh
 Thanks Till - in the case of restart of flink master - I believe the jobid 
will be different.  Thanks
On Thursday, May 28, 2020, 11:33:38 AM EDT, Till Rohrmann 
 wrote:  
 
 Hi,
Yarn won't resubmit the job. In case of a process failure where Yarn restarts 
the Flink Master, the Master will recover the submitted jobs from a persistent 
storage system.
Cheers,Till
On Thu, May 28, 2020 at 4:05 PM M Singh  wrote:

 Hi Till/Zhu/Yang:  Thanks for your replies.
So just to clarify - the job id remains same if the job restarts have not been 
exhausted.  Does Yarn also resubmit the job in case of failures and if so, then 
is the job id different.
ThanksOn Wednesday, May 27, 2020, 10:05:40 AM EDT, Till Rohrmann 
 wrote:  
 
 Hi,
if you submit the same job multiple times, then it will get every time a 
different JobID assigned. For Flink, different job submissions are considered 
to be different jobs. Once a job has been submitted, it will keep the same 
JobID which is important in order to retrieve the checkpoints associated with 
this job.
Cheers,Till
On Tue, May 26, 2020 at 12:42 PM M Singh  wrote:

 Hi Zhu Zhu:
I have another clafication - it looks like if I run the same app multiple times 
- it's job id changes.  So it looks like even though the graph is the same the 
job id is not dependent on the job graph only since with different runs of the 
same app it is not the same.
Please let me know if I've missed anything.
Thanks
On Monday, May 25, 2020, 05:32:39 PM EDT, M Singh  
wrote:  
 
  Hi Zhu Zhu:
Just to clarify - from what I understand, EMR also has by default restart times 
(I think it is 3). So if the EMR restarts the job - the job id is the same 
since the job graph is the same. 
Thanks for the clarification.
On Monday, May 25, 2020, 04:01:17 AM EDT, Yang Wang  
wrote:  
 
 Just share some additional information.
When deploying Flink application on Yarn and it exhausted restart policy, 
thenthe whole application will failed. If you start another instance(Yarn 
application),even the high availability is configured, we could not recover 
from the latestcheckpoint because the clusterId(i.e. applicationId) has changed.

Best,Yang
Zhu Zhu  于2020年5月25日周一 上午11:17写道:

Hi M,
Regarding your questions:1. yes. The id is fixed once the job graph is 
generated.2. yes
Regarding yarn mode:1. the job id keeps the same because the job graph will be 
generated once at client side and persist in DFS for reuse2. yes if high 
availability is enabled

Thanks,Zhu Zhu
M Singh  于2020年5月23日周六 上午4:06写道:

Hi Flink Folks:
If I have a Flink Application with 10 restarts, if it fails and restarts, then:
1. Does the job have the same id ?2. Does the automatically restarting 
application, pickup from the last checkpoint ? I am assuming it does but just 
want to confirm.
Also, if it is running on AWS EMR I believe EMR/Yarn is configured to restart 
the job 3 times (after it has exhausted it's restart policy) .  If that is the 
case:1. Does the job get a new id ? I believe it does, but just want to 
confirm.2. Does the Yarn restart honor the last checkpoint ?  I believe, it 
does not, but is there a way to make it restart from the last checkpoint of the 
failed job (after it has exhausted its restart policy) ?
Thanks




  
  

Re: Re: Flink Window with multiple trigger condition

2020-05-28 Thread aj
Hi,

I have implemented the below solution and its working fine but the biggest
problem with this is if no event coming for the user after 30 min then I am
not able to trigger because I am checking
time diff from upcoming events. So when the next event comes than only it
triggers but I want it to trigger just after 30 mins.

So please help me to improve this and how to solve the above problem.



public class DemandSessionFlatMap extends
RichFlatMapFunction,
DemandSessionSummaryTuple> {

private static final Logger LOGGER =
LoggerFactory.getLogger(DemandSessionFlatMap.class);

private transient ValueState>
timeState; // maintain session_id starttime and endtime
private transient MapState
sessionSummary; // map for hex9 and summarytuple

@Override
public void open(Configuration config) {

ValueStateDescriptor> timeDescriptor =
new ValueStateDescriptor<>(
"time_state", // the state name
TypeInformation.of(new TypeHint>() {
}), // type information
Tuple3.of(null, 0L, 0L)); // default value of
the state, if nothing was set
timeState = getRuntimeContext().getState(timeDescriptor);

MapStateDescriptor descriptor =
new MapStateDescriptor("demand_session",
TypeInformation.of(new TypeHint() {
}), TypeInformation.of(new
TypeHint() {
}));
sessionSummary = getRuntimeContext().getMapState(descriptor);

}

@Override
public void flatMap(Tuple2 recordTuple2,
Collector collector) throws Exception {
GenericRecord record = recordTuple2.f1;
String event_name = record.get("event_name").toString();
long event_ts = (Long) record.get("event_ts");
Tuple3 currentTimeState = timeState.value();

if (event_name.equals("search_list_keyless") &&
currentTimeState.f1 == 0) {
currentTimeState.f1 = event_ts;
String demandSessionId = UUID.randomUUID().toString();
currentTimeState.f0 = demandSessionId;
}

long timeDiff = event_ts - currentTimeState.f1;

if (event_name.equals("keyless_start_trip") || timeDiff >= 180) {
Tuple3 finalCurrentTimeState = currentTimeState;
sessionSummary.entries().forEach( tuple ->{
String key = tuple.getKey();
DemandSessionSummaryTuple sessionSummaryTuple =
tuple.getValue();
try {
sessionSummaryTuple.setEndTime(finalCurrentTimeState.f2);
collector.collect(sessionSummaryTuple);
} catch (Exception e) {
e.printStackTrace();
}

});
timeState.clear();
sessionSummary.clear();
currentTimeState = timeState.value();
}

if (event_name.equals("search_list_keyless") &&
currentTimeState.f1 == 0) {
currentTimeState.f1 = event_ts;
String demandSessionId = UUID.randomUUID().toString();
currentTimeState.f0 = demandSessionId;
}
currentTimeState.f2 = event_ts;

if (currentTimeState.f1 > 0) {
String search_hex9 = record.get("search_hex9") != null ?
record.get("search_hex9").toString() : null;
DemandSessionSummaryTuple currentTuple =
sessionSummary.get(search_hex9) != null ?
sessionSummary.get(search_hex9) : new DemandSessionSummaryTuple();

if (sessionSummary.get(search_hex9) == null) {
currentTuple.setSearchHex9(search_hex9);
currentTuple.setUserId(recordTuple2.f0);
currentTuple.setStartTime(currentTimeState.f1);
currentTuple.setDemandSessionId(currentTimeState.f0);
}

if (event_name.equals("search_list_keyless")) {
currentTuple.setTotalSearch(currentTuple.getTotalSearch() + 1);
SearchSummaryCalculation(record, currentTuple);
}
sessionSummary.put(search_hex9, currentTuple);
}
timeState.update(currentTimeState);
}






On Sun, May 24, 2020 at 10:57 PM Yun Gao  wrote:

> Hi,
>
>First sorry that I'm not expert on Window and please correct me if
> I'm wrong, but from my side, it seems the assigner might also be a problem
> in addition to the trigger: currently Flink window assigner should be all
> based on time (processing time or event time), and it might be hard to
> implement an event-driven window assigner that start to assign elements to
> a window after received some elements.
>
>   What comes to me is that a possible alternative method is to use the
> low-level *KeyedProcessFunction* directly:  you may register a timer 30
> mins later when received the "*search*" event and write the time of
> search event into the state. Then for the following events, they will be
> saved to the state 

Re: Flink Elastic Sink

2020-05-28 Thread Yangze Guo
Hi, Anuj.

>From my understanding, you could send IndexRequest to the indexer in
`ElasticsearchSink`. It will create a document under the given index
and type. So, it seems you only need to get the timestamp and concat
the `date` to your index. Am I understanding that correctly? Or do you
want to emit only 1 record per day?

Best,
Yangze Guo

On Fri, May 29, 2020 at 2:43 AM aj  wrote:
>
> Hello All,
>
> I am getting many events in Kafka and I have written a link job that sinks 
> that Avro records from Kafka to S3 in parquet format.
>
> Now, I want to sink these records into elastic search. but the only challenge 
> is that I want to sink record on time indices. Basically, In Elastic, I want 
> to create a per day index with the date as the suffix.
> So in Flink stream job if I create an es sink how will I change the sink to 
> start writing  in a new index when the first event of the day arrives
>
> Thanks,
> Anuj.
>
>
>
>
>


Re: Flink Elastic Sink

2020-05-28 Thread Leonard Xu
Hi,aj

In the implementation of ElasticsearchSink, ElasticsearchSink  won't create 
index and only start a Elastic client for sending requests to
the Elastic cluster. You can simply extract the index(date value in your case) 
from your timestamp field and then put it to an IndexRequest[2],  
ElasticsearchSink will send the IndexRequests to the Elastic cluster, Elastic 
cluster will create corresponding index and flush the records.

BTW, If you’re using Flink SQL you can use dynamic index in Elasticsearch sql 
connector [2], you can simply config 'connector.index' = 
‘myindex_{ts_field|-MM-dd}’ to achieve your goals.

Best,
Leoanrd Xu
[1] 
https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-elasticsearch7-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExample.java#L119
 

 
[2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector
 





> 在 2020年5月29日,02:43,aj  写道:
> 
> Hello All,
> 
> I am getting many events in Kafka and I have written a link job that sinks 
> that Avro records from Kafka to S3 in parquet format. 
> 
> Now, I want to sink these records into elastic search. but the only challenge 
> is that I want to sink record on time indices. Basically, In Elastic, I want 
> to create a per day index with the date as the suffix. 
> So in Flink stream job if I create an es sink how will I change the sink to 
> start writing  in a new index when the first event of the day arrives
> 
> Thanks,
> Anuj. 
> 
> 
>  
> 
> 
>  


Re: Re: Re: Flink Window with multiple trigger condition

2020-05-28 Thread Yun Gao
Hi,
 I think you could use timer to achieve that. In processFunction you could 
register a timer at specific time (event time or processing time) and get 
callbacked at that point. It could be registered like 
ctx.timerService().registerEventTimeTimer(current.lastModified + 6);
More details on timer could be found in [1] and an example is in [2]. In 
this example, a timer is registered in the last line of the processElement 
method, and the callback is implemented by override the onTimer method.

   [1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#timers
   [2] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html#example



 --Original Mail --
Sender:aj 
Send Date:Fri May 29 02:07:33 2020
Recipients:Yun Gao 
CC:user 
Subject:Re: Re: Flink Window with multiple trigger condition

Hi,

I have implemented the below solution and its working fine but the biggest 
problem with this is if no event coming for the user after 30 min then I am not 
able to trigger because I am checking
time diff from upcoming events. So when the next event comes than only it 
triggers but I want it to trigger just after 30 mins. 

So please help me to improve this and how to solve the above problem.



public class DemandSessionFlatMap extends RichFlatMapFunction, DemandSessionSummaryTuple> {

private static final Logger LOGGER = 
LoggerFactory.getLogger(DemandSessionFlatMap.class);

private transient ValueState> timeState; // 
maintain session_id starttime and endtime 
private transient MapState 
sessionSummary; // map for hex9 and summarytuple

@Override
public void open(Configuration config) {

ValueStateDescriptor> timeDescriptor =
new ValueStateDescriptor<>(
"time_state", // the state name
TypeInformation.of(new TypeHint>() {
}), // type information
Tuple3.of(null, 0L, 0L)); // default value of the 
state, if nothing was set
timeState = getRuntimeContext().getState(timeDescriptor);

MapStateDescriptor descriptor =
new MapStateDescriptor("demand_session",
TypeInformation.of(new TypeHint() {
}), TypeInformation.of(new 
TypeHint() {
}));
sessionSummary = getRuntimeContext().getMapState(descriptor);

}

@Override
public void flatMap(Tuple2 recordTuple2, 
Collector collector) throws Exception {
GenericRecord record = recordTuple2.f1;
String event_name = record.get("event_name").toString();
long event_ts = (Long) record.get("event_ts");
Tuple3 currentTimeState = timeState.value();

if (event_name.equals("search_list_keyless") && currentTimeState.f1 == 
0) {
currentTimeState.f1 = event_ts;
String demandSessionId = UUID.randomUUID().toString();
currentTimeState.f0 = demandSessionId;
}

long timeDiff = event_ts - currentTimeState.f1;

if (event_name.equals("keyless_start_trip") || timeDiff >= 180) {
Tuple3 finalCurrentTimeState = currentTimeState;
sessionSummary.entries().forEach( tuple ->{
String key = tuple.getKey();
DemandSessionSummaryTuple sessionSummaryTuple = 
tuple.getValue();
try {
sessionSummaryTuple.setEndTime(finalCurrentTimeState.f2);
collector.collect(sessionSummaryTuple);
} catch (Exception e) {
e.printStackTrace();
}

});
timeState.clear();
sessionSummary.clear();
currentTimeState = timeState.value();
}

if (event_name.equals("search_list_keyless") && currentTimeState.f1 == 
0) {
currentTimeState.f1 = event_ts;
String demandSessionId = UUID.randomUUID().toString();
currentTimeState.f0 = demandSessionId;
}
currentTimeState.f2 = event_ts;

if (currentTimeState.f1 > 0) {
String search_hex9 = record.get("search_hex9") != null ? 
record.get("search_hex9").toString() : null;
DemandSessionSummaryTuple currentTuple = 
sessionSummary.get(search_hex9) != null ? sessionSummary.get(search_hex9) : new 
DemandSessionSummaryTuple();

if (sessionSummary.get(search_hex9) == null) {
currentTuple.setSearchHex9(search_hex9);
currentTuple.setUserId(recordTuple2.f0);
currentTuple.setStartTime(currentTimeState.f1);
currentTuple.setDemandSessionId(currentTimeState.f0);
}

if (event_name.equals("search_list_keyless")) {
currentTuple.setTotalSearch(currentTuple.getTotalSearch() + 1);
SearchSummaryCalculation(record, 

Re: Apache Flink - Question about application restart

2020-05-28 Thread Zhu Zhu
Restarting of flink master does not change the jobId if one yarn
application.
To be simple, in a yarn application that runs a flink cluster, the job id
of a job does not change once the job is submitted.
You can even submit a flink application multiples times to that cluster (if
it is session mode) but each submission will be treated as a different job
and will have a different job id.

Thanks,
Zhu Zhu

M Singh  于2020年5月29日周五 上午4:59写道:

> Thanks Till - in the case of restart of flink master - I believe the jobid
> will be different.  Thanks
>
> On Thursday, May 28, 2020, 11:33:38 AM EDT, Till Rohrmann <
> trohrm...@apache.org> wrote:
>
>
> Hi,
>
> Yarn won't resubmit the job. In case of a process failure where Yarn
> restarts the Flink Master, the Master will recover the submitted jobs from
> a persistent storage system.
>
> Cheers,
> Till
>
> On Thu, May 28, 2020 at 4:05 PM M Singh  wrote:
>
> Hi Till/Zhu/Yang:  Thanks for your replies.
>
> So just to clarify - the job id remains same if the job restarts have not
> been exhausted.  Does Yarn also resubmit the job in case of failures and if
> so, then is the job id different.
>
> Thanks
> On Wednesday, May 27, 2020, 10:05:40 AM EDT, Till Rohrmann <
> trohrm...@apache.org> wrote:
>
>
> Hi,
>
> if you submit the same job multiple times, then it will get every time a
> different JobID assigned. For Flink, different job submissions are
> considered to be different jobs. Once a job has been submitted, it will
> keep the same JobID which is important in order to retrieve the checkpoints
> associated with this job.
>
> Cheers,
> Till
>
> On Tue, May 26, 2020 at 12:42 PM M Singh  wrote:
>
> Hi Zhu Zhu:
>
> I have another clafication - it looks like if I run the same app multiple
> times - it's job id changes.  So it looks like even though the graph is the
> same the job id is not dependent on the job graph only since with different
> runs of the same app it is not the same.
>
> Please let me know if I've missed anything.
>
> Thanks
>
> On Monday, May 25, 2020, 05:32:39 PM EDT, M Singh 
> wrote:
>
>
> Hi Zhu Zhu:
>
> Just to clarify - from what I understand, EMR also has by default restart
> times (I think it is 3). So if the EMR restarts the job - the job id is the
> same since the job graph is the same.
>
> Thanks for the clarification.
>
> On Monday, May 25, 2020, 04:01:17 AM EDT, Yang Wang 
> wrote:
>
>
> Just share some additional information.
>
> When deploying Flink application on Yarn and it exhausted restart policy,
> then
> the whole application will failed. If you start another instance(Yarn
> application),
> even the high availability is configured, we could not recover from the
> latest
> checkpoint because the clusterId(i.e. applicationId) has changed.
>
>
> Best,
> Yang
>
> Zhu Zhu  于2020年5月25日周一 上午11:17写道:
>
> Hi M,
>
> Regarding your questions:
> 1. yes. The id is fixed once the job graph is generated.
> 2. yes
>
> Regarding yarn mode:
> 1. the job id keeps the same because the job graph will be generated once
> at client side and persist in DFS for reuse
> 2. yes if high availability is enabled
>
> Thanks,
> Zhu Zhu
>
> M Singh  于2020年5月23日周六 上午4:06写道:
>
> Hi Flink Folks:
>
> If I have a Flink Application with 10 restarts, if it fails and restarts,
> then:
>
> 1. Does the job have the same id ?
> 2. Does the automatically restarting application, pickup from the last
> checkpoint ? I am assuming it does but just want to confirm.
>
> Also, if it is running on AWS EMR I believe EMR/Yarn is configured to
> restart the job 3 times (after it has exhausted it's restart policy) .  If
> that is the case:
> 1. Does the job get a new id ? I believe it does, but just want to confirm.
> 2. Does the Yarn restart honor the last checkpoint ?  I believe, it does
> not, but is there a way to make it restart from the last checkpoint of the
> failed job (after it has exhausted its restart policy) ?
>
> Thanks
>
>
>


RE: History Server Not Showing Any Jobs - File Not Found?

2020-05-28 Thread Hailu, Andreas
May I also ask what version of flink-hadoop you're using and the number of jobs 
you're storing the history for? As of writing we have roughly 101,000 
application history files. I'm curious to know if we're encountering some kind 
of resource problem.

// ah

From: Hailu, Andreas [Engineering]
Sent: Thursday, May 28, 2020 12:18 PM
To: 'Chesnay Schepler' ; user@flink.apache.org
Subject: RE: History Server Not Showing Any Jobs - File Not Found?

Okay, I will look further to see if we're mistakenly using a version that's 
pre-2.6.0. However, I don't see flink-shaded-hadoop in my /lib directory for 
flink-1.9.1.

flink-dist_2.11-1.9.1.jar
flink-table-blink_2.11-1.9.1.jar
flink-table_2.11-1.9.1.jar
log4j-1.2.17.jar
slf4j-log4j12-1.7.15.jar

Are the files within /lib.

// ah

From: Chesnay Schepler mailto:ches...@apache.org>>
Sent: Thursday, May 28, 2020 11:00 AM
To: Hailu, Andreas [Engineering] 
mailto:andreas.ha...@ny.email.gs.com>>; 
user@flink.apache.org
Subject: Re: History Server Not Showing Any Jobs - File Not Found?

Looks like it is indeed stuck on downloading the archive.

I searched a bit in the Hadoop JIRA and found several similar instances:
https://issues.apache.org/jira/browse/HDFS-6999
https://issues.apache.org/jira/browse/HDFS-7005
https://issues.apache.org/jira/browse/HDFS-7145

It is supposed to be fixed in 2.6.0 though :/

If hadoop is available from the HADOOP_CLASSPATH and flink-shaded-hadoop in 
/lib then you basically don't know what Hadoop version is actually being used,
which could lead to incompatibilities and dependency clashes.
If flink-shaded-hadoop 2.4/2.5 is on the classpath, maybe that is being used 
and runs into HDFS-7005.

On 28/05/2020 16:27, Hailu, Andreas wrote:
Just created a dump, here's what I see:

"Flink-HistoryServer-ArchiveFetcher-thread-1" #19 daemon prio=5 os_prio=0 
tid=0x7f93a5a2c000 nid=0x5692 runnable [0x7f934a0d3000]
   java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x0005df986960> (a sun.nio.ch.Util$2)
- locked <0x0005df986948> (a java.util.Collections$UnmodifiableSet)
- locked <0x0005df928390> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at 
org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:335)
at 
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157)
at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.readChannelFully(PacketReceiver.java:258)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:209)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:171)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)
at 
org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:201)
at 
org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:152)
- locked <0x0005ceade5e0> (a 
org.apache.hadoop.hdfs.RemoteBlockReader2)
at 
org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:781)
at 
org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:837)
- eliminated <0x0005cead3688> (a 
org.apache.hadoop.hdfs.DFSInputStream)
at 
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:897)
- locked <0x0005cead3688> (a org.apache.hadoop.hdfs.DFSInputStream)
   at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:945)
- locked <0x0005cead3688> (a org.apache.hadoop.hdfs.DFSInputStream)
at 

Re: How to create schema for flexible json data in Flink SQL

2020-05-28 Thread Benchao Li
Hi Guodong,

After an offline discussion with Leonard. I think you get the right meaning
of schema inference.
But there are two problems here:
1. schema of the data is fixed, schema inference can save your effort to
write the schema explicitly.
2. schema of the data is dynamic, in this case the schema inference cannot
help. Because SQL is somewhat static language, which should know all the
data types at compile stage.

Maybe I've misunderstood your question at the very beginning. I thought
your case is #2. If your case is #1, then schema inference is a good
choice.

Guodong Wang  于2020年5月28日周四 下午11:39写道:

> Yes. Setting the value type as raw is one possible approach. And I would
> like to vote for schema inference as well.
>
> Correct me if I am wrong, IMO schema inference means I can provide a
> method in the table source to infer the data schema base on the runtime
> computation. Just like some calcite adaptor does. Right?
> For SQL table registration, I think that requiring the table source to
> provide a static schema might be too strict. Let planner to infer the table
> schema will be more flexible.
>
> Thank you for your suggestions.
>
> Guodong
>
>
> On Thu, May 28, 2020 at 11:11 PM Benchao Li  wrote:
>
>> Hi Guodong,
>>
>> Does the RAW type meet your requirements? For example, you can specify
>> map type, and the value for the map is the raw JsonNode
>> parsed from Jackson.
>> This is not supported yet, however IMO this could be supported.
>>
>> Guodong Wang  于2020年5月28日周四 下午9:43写道:
>>
>>> Benchao,
>>>
>>> Thank you for your quick reply.
>>>
>>> As you mentioned, for current scenario, approach 2 should work for me.
>>> But it is a little bit annoying that I have to modify schema to add new
>>> field types when upstream app changes the json format or adds new fields.
>>> Otherwise, my user can not refer the field in their SQL.
>>>
>>> Per description in the jira, I think after implementing this, all the
>>> json values will be converted as strings.
>>> I am wondering if Flink SQL can/will support the flexible schema in the
>>> future, for example, register the table without defining specific schema
>>> for each field, to let user define a generic map or array for one field.
>>> but the value of map/array can be any object. Then, the type conversion
>>> cost might be saved.
>>>
>>> Guodong
>>>
>>>
>>> On Thu, May 28, 2020 at 7:43 PM Benchao Li  wrote:
>>>
 Hi Guodong,

 I think you almost get the answer,
 1. map type, it's not working for current implementation. For example,
 use map, if the value if non-string json object, then
 `JsonNode.asText()` may not work as you wish.
 2. list all fields you cares. IMO, this can fit your scenario. And you
 can set format.fail-on-missing-field = true, to allow setting non-existed
 fields to be null.

 For 1, I think maybe we can support it in the future, and I've created
 jira[1] to track this.

 [1] https://issues.apache.org/jira/browse/FLINK-18002

 Guodong Wang  于2020年5月28日周四 下午6:32写道:

> Hi !
>
> I want to use Flink SQL to process some json events. It is quite
> challenging to define a schema for the Flink SQL table.
>
> My data source's format is some json like this
> {
> "top_level_key1": "some value",
> "nested_object": {
> "nested_key1": "abc",
> "nested_key2": 123,
> "nested_key3": ["element1", "element2", "element3"]
> }
> }
>
> The big challenges for me to define a schema for the data source are
> 1. the keys in nested_object are flexible, there might be 3 unique
> keys or more unique keys. If I enumerate all the keys in the schema, I
> think my code is fragile, how to handle event which contains more
> nested_keys in nested_object ?
> 2. I know table api support Map type, but I am not sure if I can put
> generic object as the value of the map. Because the values in 
> nested_object
> are of different types, some of them are int, some of them are string or
> array.
>
> So. how to expose this kind of json data as table in Flink SQL without
> enumerating all the nested_keys?
>
> Thanks.
>
> Guodong
>


 --

 Best,
 Benchao Li

>>>
>>
>> --
>>
>> Best,
>> Benchao Li
>>
>

-- 

Best,
Benchao Li


Re: ClusterClientFactory selection

2020-05-28 Thread Yang Wang
You could find more information about deployment target here[1]. As you
mentioned,
it is not defined in the flink-conf.yaml by default.

For the code, it is defined in flink-core/DeploymentOptions.

[1].
https://ci.apache.org/projects/flink/flink-docs-master/ops/cli.html#deployment-targets

Best,
Yang

M Singh  于2020年5月28日周四 下午10:34写道:

> HI Kostas/Yang/Lake:
>
> I am looking at aws emr and did not see the execution.target in the
> flink-conf.yaml file under flink/conf directory.
> Is it defined in another place ?
>
> I also did search in the current flink source code and did find mention of
> it in the md files but not in any property file or the flink-yarn sub
> module.
>
> Please let me know if I am missing anything.
>
> Thanks
>
> On Wednesday, May 27, 2020, 03:51:28 AM EDT, Kostas Kloudas <
> kklou...@gmail.com> wrote:
>
>
> Hi Singh,
>
> The only thing to add to what Yang said is that the "execution.target"
> configuration option (in the config file) is also used for the same
> purpose from the execution environments.
>
> Cheers,
> Kostas
>
> On Wed, May 27, 2020 at 4:49 AM Yang Wang  wrote:
> >
> > Hi M Singh,
> >
> > The Flink CLI picks up the correct ClusterClientFactory via java SPI. You
> > could check YarnClusterClientFactory#isCompatibleWith for how it is
> activated.
> > The cli option / configuration is "-e/--executor" or execution.target
> (e.g. yarn-per-job).
> >
> >
> > Best,
> > Yang
> >
> > M Singh  于2020年5月26日周二 下午6:45写道:
> >>
> >> Hi:
> >>
> >> I wanted to find out which parameter/configuration allows flink cli
> pick up the appropriate cluster client factory (especially in the yarn
> mode).
> >>
> >> Thanks
>


Re: How do I make sure to place operator instances in specific Task Managers?

2020-05-28 Thread Weihua Hu
Hi, Felipe

Flink does not support run tasks on specified TM. 
You can use slotSharingGroup to control Tasks not in same Slot, but cannot 
specified which TM.

Can you please give the reason for specifying TM?


Best
Weihua Hu

> 2020年5月28日 21:37,Felipe Gutierrez  写道:
> 
> For instance, if I have the following DAG with the respect parallelism
> in parenthesis (I hope the dag appears real afterall):
> 
>  source01 -> map01(4) -> flatmap01(4) \
> 
>  |-> keyBy -> reducer(8)
>  source02 -> map02(4) -> flatmap02(4) /
> 
> And I have 4 TMs in 4 machines with 4 cores each. I would like to
> place source01 and map01 and flatmap01 in TM-01. source02 and map02
> and flatmap02 in TM-02. I am using "disableChaning()" in the faltMap
> operator to measure it. And reducer1-to-4 in TM-03 and reducer5-to-8
> in TM-04.
> 
> I am using the methods "setParallelism()" and "slotSharingGroup()" to
> define it but both source01 and source02 are placed in TM-01 and map01
> is split into 2 TMs. The same with map02.
> 
> Thanks,
> Felipe
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com



Re: Cannot start native K8s

2020-05-28 Thread Yang Wang
A quick update on this issue.

The root cause of this issue is compatibility of kubernetes-client and java
8u252[1]. And we have
bumped he fabric8 kubernetes-client version from 4.5.2 to 4.9.2 in master
and release-1.11 branch.
Now users could deploy Flink on K8s natively with java 8u252.

If you really could not use the latest Flink version, you could set the
environment "HTTP2_DISABLE=true"
in Flink client, jobmanager, taskmanager side.

[1]. https://github.com/fabric8io/kubernetes-client/issues/2212

Best,
Yang

Yang Wang  于2020年5月11日周一 上午11:51写道:

> Glad to hear that you could deploy the Flink cluster on K8s natively.
> Thanks for
> trying the in-preview feature and give your feedback.
>
>
> Moreover, i want to give a very simple conclusion here. Currently, because
> of the
> compatibility issue of fabric8 kubernetes-client, the native K8s
> integration have the
> following known limitation.
> * For jdk 8u252, the native k8s integration could only work on kubernetes
> v1.16 and
> lower versions.
> * For other jdk versions(e.g. 8u242, jdk11), i am not aware of the same
> issues. The native
> K8s integration works well.
>
>
> Best,
> Yang
>
> Dongwon Kim  于2020年5月9日周六 上午11:46写道:
>
>> Hi Yang,
>>
>> Oops, I forget to copy /etc/kube/admin.conf to $HOME/.kube/config so that
>> the current user account can access to K8s.
>> Now that I copied it, I found that kubernetes-session.sh is working fine.
>> Thanks very much!
>>
>> Best,
>> Dongwon
>>
>> [flink@DAC-E04-W06 ~]$ kubernetes-session.sh
>> 2020-05-09 12:43:49,961 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: jobmanager.rpc.address, DAC-E04-W06
>> 2020-05-09 12:43:49,962 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: jobmanager.rpc.port, 6123
>> 2020-05-09 12:43:49,962 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: jobmanager.heap.size, 1024m
>> 2020-05-09 12:43:49,962 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: taskmanager.memory.process.size, 24g
>> 2020-05-09 12:43:49,963 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: taskmanager.numberOfTaskSlots, 24
>> 2020-05-09 12:43:49,963 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: parallelism.default, 1
>> 2020-05-09 12:43:49,963 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: high-availability, zookeeper
>> 2020-05-09 12:43:49,963 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: high-availability.zookeeper.path.root, /flink
>> 2020-05-09 12:43:49,964 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: high-availability.storageDir, hdfs:///user/flink/ha/
>> 2020-05-09 12:43:49,964 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: high-availability.zookeeper.quorum, DAC-E04-W06:2181
>> 2020-05-09 12:43:49,965 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: jobmanager.execution.failover-strategy, region
>> 2020-05-09 12:43:49,965 INFO
>>  org.apache.flink.configuration.GlobalConfiguration- Loading
>> configuration property: rest.port, 8082
>> 2020-05-09 12:43:51,122 INFO
>>  org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils  - The
>> derived from fraction jvm overhead memory (2.400gb (2576980416 bytes)) is
>> greater than its max value 1024.000mb (1073741824 bytes), max value will be
>> used instead
>> 2020-05-09 12:43:51,123 INFO
>>  org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils  - The
>> derived from fraction network memory (2.291gb (2459539902 bytes)) is
>> greater than its max value 1024.000mb (1073741824 bytes), max value will be
>> used instead
>> 2020-05-09 12:43:51,131 INFO
>>  org.apache.flink.kubernetes.utils.KubernetesUtils - Kubernetes
>> deployment requires a fixed port. Configuration blob.server.port will be
>> set to 6124
>> 2020-05-09 12:43:51,131 INFO
>>  org.apache.flink.kubernetes.utils.KubernetesUtils - Kubernetes
>> deployment requires a fixed port. Configuration taskmanager.rpc.port will
>> be set to 6122
>> 2020-05-09 12:43:51,134 INFO
>>  org.apache.flink.kubernetes.utils.KubernetesUtils - Kubernetes
>> deployment requires a fixed port. Configuration
>> high-availability.jobmanager.port will be set to 6123
>> 2020-05-09 12:43:52,167 INFO
>>  org.apache.flink.kubernetes.KubernetesClusterDescriptor   - Create
>> flink session cluster flink-cluster-4a82d41b-af15-4205-8a44-62351e270242
>> successfully, JobManager Web Interface: 

Re:Re: flink sql 写 hive分区表失败

2020-05-28 Thread Zhou Zach
多谢指点,可以了。
但是换成动态插入,有问题:
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: SQL parse failed. Encountered "p_year" at line 3, column 58.
Was expecting one of:
"DATE" ...
"FALSE" ...
"INTERVAL" ...
"NULL" ...
"TIME" ...
"TIMESTAMP" ...
"TRUE" ...
"UNKNOWN" ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
 ...
"+" ...
"-" ...


at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)




Query:
tableEnv.sqlUpdate(
  """
|
|INSERT INTO dwdCatalog.dwd.t1_copy partition (`p_year` = p_year, 
`p_month` = p_month)
|select id,name from dwdCatalog.dwd.t1 where `p_year` = 2020 and 
`p_month` = 4
|
|""".stripMargin)

















在 2020-05-28 13:39:49,"Leonard Xu"  写道:
>Hi,
>>|select * from dwdCatalog.dwd.t1 where `p_year` = 2020 and `p_month` 
>> = 5
>
>应该是 select * 会把分区字段一起带出来吧,你字段就不匹配了,select里加上你需要的字段吧
> 
>祝好,
>Leonard Xu
>
>> 在 2020年5月28日,12:57,Zhou Zach  写道:
>> 
>> org.apache.flink.client.program.ProgramInvocationException: The main method 
>> caused an error: Field types of query result and registered TableSink 
>> dwdCatalog.dwd.t1_copy do not match.
>> 
>> Query schema: [id: BIGINT, name: STRING, p_year: INT, p_month: INT, EXPR$4: 
>> INT NOT NULL, EXPR$5: INT NOT NULL]
>> 
>> Sink schema: [id: BIGINT, name: STRING, p_year: INT, p_month: INT]
>> 
>> at 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>> 
>> at 
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>> 
>> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>> 
>> at 
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>> 
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>> 
>> at 
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>> 
>> at 
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> 
>> at java.security.AccessController.doPrivileged(Native Method)
>> 
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> 
>> at 
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>> 
>> at 
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> 
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> 
>> 
>> 
>> 
>> hive分区表:
>> CREATE TABLE `dwd.t1`(
>>  `id` bigint, 
>>  `name` string)
>> PARTITIONED BY ( 
>>  `p_year` int, 
>>  `p_month` int)
>> 
>> 
>> CREATE TABLE `dwd.t1_copy`(
>>  `id` bigint, 
>>  `name` string)
>> PARTITIONED BY ( 
>>  `p_year` int, 
>>  `p_month` int)
>> 
>> 
>> Flink sql:
>> tableEnv.sqlUpdate(
>>  """
>>|
>>|INSERT INTO dwdCatalog.dwd.t1_copy partition (`p_year` = 2020, 
>> `p_month` = 5)
>>|select * from dwdCatalog.dwd.t1 where `p_year` = 2020 and `p_month` 
>> = 5
>>|
>>|""".stripMargin)
>> 
>> 
>> thanks for your help


疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

2020-05-28 Thread wind.fly....@outlook.com
Hi,all:
当前使用版本flink 1.10.0,使用blink planner,假如有如下代码:

tEnv.createTemporaryView("b", tEnv.sqlQuery("select * from a"));
tEnv.sqlUpdate("insert into c select * from b where b.con1 = '1'");
tEnv.sqlUpdate("insert into d select * from b where b.con1 = '2'");

其中a是kafka表,connector属性为:
'connector.properties.group.id' = 'testGroup',
'connector.startup-mode' = 'group-offsets'

   疑问是该应用运行时c、d消费a表,a表group 'testGroup' 
offset的提交会不会受到影响,还是a表中同一条记录会流向2个不同的分支?


Re: flink sql 写 hive分区表失败

2020-05-28 Thread Leonard Xu
 
>|INSERT INTO dwdCatalog.dwd.t1_copy partition (`p_year` = p_year, 
> `p_month` = p_month)
>|select id,name from dwdCatalog.dwd.t1 where `p_year` = 2020 and 
> `p_month` = 4 

动态分区不是这样指定的,和hive的语法是一样的,下面两种应该都可以,flink这边文档少了点,可以参考[1][2]

INSERT INTO dwdCatalog.dwd.t1_copy 
 select id,name,`p_year`,`p_month` from dwdCatalog.dwd.t1 where `p_year` = 
2020 and `p_month` = 4 

INSERT INTO dwdCatalog.dwd.t1_copy 
select * from dwdCatalog.dwd.t1 where `p_year` = 2020 and `p_month` = 4 

Best,
Leonard Xu
[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/insert.html#examples
 

[2]  
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java#L294
 




> 在 2020年5月28日,13:59,Zhou Zach  写道:
> 
> 多谢指点,可以了。
> 但是换成动态插入,有问题:
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: SQL parse failed. Encountered "p_year" at line 3, column 58.
> Was expecting one of:
>"DATE" ...
>"FALSE" ...
>"INTERVAL" ...
>"NULL" ...
>"TIME" ...
>"TIMESTAMP" ...
>"TRUE" ...
>"UNKNOWN" ...
> ...
> ...
> ...
> ...
> ...
> ...
> ...
> ...
> ...
> ...
>"+" ...
>"-" ...
> 
> 
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> 
> 
> 
> 
> Query:
> tableEnv.sqlUpdate(
>  """
>|
>|INSERT INTO dwdCatalog.dwd.t1_copy partition (`p_year` = p_year, 
> `p_month` = p_month)
>|select id,name from dwdCatalog.dwd.t1 where `p_year` = 2020 and 
> `p_month` = 4
>|
>|""".stripMargin)
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-05-28 13:39:49,"Leonard Xu"  写道:
>> Hi,
>>>   |select * from dwdCatalog.dwd.t1 where `p_year` = 2020 and `p_month` 
>>> = 5
>> 
>> 应该是 select * 会把分区字段一起带出来吧,你字段就不匹配了,select里加上你需要的字段吧
>> 
>> 祝好,
>> Leonard Xu
>> 
>>> 在 2020年5月28日,12:57,Zhou Zach  写道:
>>> 
>>> org.apache.flink.client.program.ProgramInvocationException: The main method 
>>> caused an error: Field types of query result and registered TableSink 
>>> dwdCatalog.dwd.t1_copy do not match.
>>> 
>>> Query schema: [id: BIGINT, name: STRING, p_year: INT, p_month: INT, EXPR$4: 
>>> INT NOT NULL, EXPR$5: INT NOT NULL]
>>> 
>>> Sink schema: [id: BIGINT, name: STRING, p_year: INT, p_month: INT]
>>> 
>>> at 
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>>> 
>>> at 
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>>> 
>>> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>>> 
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>>> 
>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>>> 
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>>> 
>>> at 
>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>>> 
>>> at java.security.AccessController.doPrivileged(Native Method)
>>> 
>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>> 
>>> at 
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>>> 
>>> at 
>>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>> 
>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>>> 
>>> 
>>> 
>>> 
>>> hive分区表:
>>> CREATE TABLE `dwd.t1`(
>>> `id` bigint, 
>>> `name` string)
>>> PARTITIONED BY ( 
>>> `p_year` int, 
>>> `p_month` int)
>>> 
>>> 
>>> CREATE TABLE `dwd.t1_copy`(
>>> `id` bigint, 
>>> `name` string)
>>> PARTITIONED BY ( 
>>> `p_year` int, 
>>> `p_month` 

flink-sql watermark问题

2020-05-28 Thread guaishushu1...@163.com
flink-1.10 sql只支持 timestamp(3) 类型字段生成watermark
但是long这样转换后也可以生成watermark很奇怪?
CREATE TABLE user_log (
response_size int,
rowtime BIGINT,
w_ts as TO_TIMESTAMP(FROM_UNIXTIME(rowtime/1000),'-MM-dd HH:mm:ss'),
WATERMARK FOR w_ts AS w_ts - INTERVAL '5' SECOND --5秒的延迟
)



guaishushu1...@163.com


recursive.file.enumeration使用问题

2020-05-28 Thread 阿华田
使用recursive.file.enumeration开启递归读取hdfs的目录文件,但是每次数据没有读完就会报如下错误:


java.io.IOException: Error opening the InputSplit 
hdfs://xxx/ds=2020-05-28/hour=15/2020-05-28_15.log.flume2_idcfeature_kafkamq.tmp
 [0,134217728]: File does not exist: /xxx/ds=2020-05-28/hour=15/
| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



Re: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

2020-05-28 Thread Benchao Li
嗯,是的。这个是blink planner特有的优化,可以支持多个sink的DAG图的优化。你也可以看下Flink
UI上的任务DAG图,比较明显的可以看到只有一个Source,后面接了两个Sink。

wind.fly@outlook.com  于2020年5月28日周四 下午5:02写道:

> Hi, Benchao:
> 谢谢回复,经过测试确实是这样,也就是同一个TableEnvironment下,如果有多条insert语句访问同一个表,该表的数据只消费一次?
>
>
>
>
> Best,
> Junbao Zhang
> 
> 发件人: Benchao Li 
> 发送时间: 2020年5月28日 15:59
> 收件人: user-zh 
> 主题: Re: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交
>
>
> 如果你的tEnv用的是TableEnvironment,而不是StreamTableEnvironment的话,两个insert会公用前面的source,也就是会只读取a表一次,然后分别给下游c和d用。
>
> wind.fly@outlook.com  于2020年5月28日周四
> 下午3:14写道:
>
> > Hi,all:
> > 当前使用版本flink 1.10.0,使用blink planner,假如有如下代码:
> >
> > tEnv.createTemporaryView("b", tEnv.sqlQuery("select * from a"));
> > tEnv.sqlUpdate("insert into c select * from b where b.con1 = '1'");
> > tEnv.sqlUpdate("insert into d select * from b where b.con1 = '2'");
> >
> > 其中a是kafka表,connector属性为:
> > 'connector.properties.group.id' = 'testGroup',
> > 'connector.startup-mode' = 'group-offsets'
> >
> >疑问是该应用运行时c、d消费a表,a表group 'testGroup'
> > offset的提交会不会受到影响,还是a表中同一条记录会流向2个不同的分支?
> >
>
>
> --
>
> Best,
> Benchao Li
>


-- 

Best,
Benchao Li


回复: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

2020-05-28 Thread wind.fly....@outlook.com
Hi, Benchao:
DAG图是指向了同一个问题,但是用了TableEnvironment后,发现所有的时间都差了13个小时,比如调用LOCALTIMESTAMP函数。另外问一个问题,使用TableEnvironment怎么设置checkpoint?




Best,
Junbao Zhang

发件人: Benchao Li 
发送时间: 2020年5月28日 17:05
收件人: user-zh 
主题: Re: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

嗯,是的。这个是blink planner特有的优化,可以支持多个sink的DAG图的优化。你也可以看下Flink
UI上的任务DAG图,比较明显的可以看到只有一个Source,后面接了两个Sink。

wind.fly@outlook.com  于2020年5月28日周四 下午5:02写道:

> Hi, Benchao:
> 谢谢回复,经过测试确实是这样,也就是同一个TableEnvironment下,如果有多条insert语句访问同一个表,该表的数据只消费一次?
>
>
>
>
> Best,
> Junbao Zhang
> 
> 发件人: Benchao Li 
> 发送时间: 2020年5月28日 15:59
> 收件人: user-zh 
> 主题: Re: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交
>
>
> 如果你的tEnv用的是TableEnvironment,而不是StreamTableEnvironment的话,两个insert会公用前面的source,也就是会只读取a表一次,然后分别给下游c和d用。
>
> wind.fly@outlook.com  于2020年5月28日周四
> 下午3:14写道:
>
> > Hi,all:
> > 当前使用版本flink 1.10.0,使用blink planner,假如有如下代码:
> >
> > tEnv.createTemporaryView("b", tEnv.sqlQuery("select * from a"));
> > tEnv.sqlUpdate("insert into c select * from b where b.con1 = '1'");
> > tEnv.sqlUpdate("insert into d select * from b where b.con1 = '2'");
> >
> > 其中a是kafka表,connector属性为:
> > 'connector.properties.group.id' = 'testGroup',
> > 'connector.startup-mode' = 'group-offsets'
> >
> >疑问是该应用运行时c、d消费a表,a表group 'testGroup'
> > offset的提交会不会受到影响,还是a表中同一条记录会流向2个不同的分支?
> >
>
>
> --
>
> Best,
> Benchao Li
>


--

Best,
Benchao Li


Re: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

2020-05-28 Thread Benchao Li
Hi,

时间差了13个小时这个比较奇怪,我理解不应该出现这个问题的。你是说不用TableEnvironment就不会出现这个问题么?
第二个问题,TableEnvironment目前应该是没法设置checkpoint的,这个应该是只能在StreamExecutionEnvironment来配置吧。

wind.fly@outlook.com  于2020年5月28日周四 下午5:27写道:

> Hi, Benchao:
>
> DAG图是指向了同一个问题,但是用了TableEnvironment后,发现所有的时间都差了13个小时,比如调用LOCALTIMESTAMP函数。另外问一个问题,使用TableEnvironment怎么设置checkpoint?
>
>
>
>
> Best,
> Junbao Zhang
> 
> 发件人: Benchao Li 
> 发送时间: 2020年5月28日 17:05
> 收件人: user-zh 
> 主题: Re: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交
>
> 嗯,是的。这个是blink planner特有的优化,可以支持多个sink的DAG图的优化。你也可以看下Flink
> UI上的任务DAG图,比较明显的可以看到只有一个Source,后面接了两个Sink。
>
> wind.fly@outlook.com  于2020年5月28日周四
> 下午5:02写道:
>
> > Hi, Benchao:
> > 谢谢回复,经过测试确实是这样,也就是同一个TableEnvironment下,如果有多条insert语句访问同一个表,该表的数据只消费一次?
> >
> >
> >
> >
> > Best,
> > Junbao Zhang
> > 
> > 发件人: Benchao Li 
> > 发送时间: 2020年5月28日 15:59
> > 收件人: user-zh 
> > 主题: Re: 疑问:flink sql
> 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交
> >
> >
> >
> 如果你的tEnv用的是TableEnvironment,而不是StreamTableEnvironment的话,两个insert会公用前面的source,也就是会只读取a表一次,然后分别给下游c和d用。
> >
> > wind.fly@outlook.com  于2020年5月28日周四
> > 下午3:14写道:
> >
> > > Hi,all:
> > > 当前使用版本flink 1.10.0,使用blink planner,假如有如下代码:
> > >
> > > tEnv.createTemporaryView("b", tEnv.sqlQuery("select * from a"));
> > > tEnv.sqlUpdate("insert into c select * from b where b.con1 = '1'");
> > > tEnv.sqlUpdate("insert into d select * from b where b.con1 = '2'");
> > >
> > > 其中a是kafka表,connector属性为:
> > > 'connector.properties.group.id' = 'testGroup',
> > > 'connector.startup-mode' = 'group-offsets'
> > >
> > >疑问是该应用运行时c、d消费a表,a表group 'testGroup'
> > > offset的提交会不会受到影响,还是a表中同一条记录会流向2个不同的分支?
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>
>
> --
>
> Best,
> Benchao Li
>


-- 

Best,
Benchao Li


Re:Re: flink sql 写 hive分区表失败

2020-05-28 Thread Zhou Zach
回复的好详细!而且引出了相关的测试用例
Thanks very much!

















在 2020-05-28 14:23:33,"Leonard Xu"  写道:
> 
>>|INSERT INTO dwdCatalog.dwd.t1_copy partition (`p_year` = p_year, 
>> `p_month` = p_month)
>>|select id,name from dwdCatalog.dwd.t1 where `p_year` = 2020 and 
>> `p_month` = 4 
>
>动态分区不是这样指定的,和hive的语法是一样的,下面两种应该都可以,flink这边文档少了点,可以参考[1][2]
>
>INSERT INTO dwdCatalog.dwd.t1_copy 
> select id,name,`p_year`,`p_month` from dwdCatalog.dwd.t1 where `p_year` = 
> 2020 and `p_month` = 4 
>
>INSERT INTO dwdCatalog.dwd.t1_copy 
>select * from dwdCatalog.dwd.t1 where `p_year` = 2020 and `p_month` = 4 
>
>Best,
>Leonard Xu
>[1] 
>https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/insert.html#examples
> 
>
>[2]  
>https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorTest.java#L294
> 
>
>
>
>
>> 在 2020年5月28日,13:59,Zhou Zach  写道:
>> 
>> 多谢指点,可以了。
>> 但是换成动态插入,有问题:
>> org.apache.flink.client.program.ProgramInvocationException: The main method 
>> caused an error: SQL parse failed. Encountered "p_year" at line 3, column 58.
>> Was expecting one of:
>>"DATE" ...
>>"FALSE" ...
>>"INTERVAL" ...
>>"NULL" ...
>>"TIME" ...
>>"TIMESTAMP" ...
>>"TRUE" ...
>>"UNKNOWN" ...
>> ...
>> ...
>> ...
>> ...
>> ...
>> ...
>> ...
>> ...
>> ...
>> ...
>>"+" ...
>>"-" ...
>> 
>> 
>> at 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>> at 
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>> at 
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>> at 
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>> at 
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>> at java.security.AccessController.doPrivileged(Native Method)
>> at javax.security.auth.Subject.doAs(Subject.java:422)
>> at 
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>> at 
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>> 
>> 
>> 
>> 
>> Query:
>> tableEnv.sqlUpdate(
>>  """
>>|
>>|INSERT INTO dwdCatalog.dwd.t1_copy partition (`p_year` = p_year, 
>> `p_month` = p_month)
>>|select id,name from dwdCatalog.dwd.t1 where `p_year` = 2020 and 
>> `p_month` = 4
>>|
>>|""".stripMargin)
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-05-28 13:39:49,"Leonard Xu"  写道:
>>> Hi,
   |select * from dwdCatalog.dwd.t1 where `p_year` = 2020 and `p_month` 
 = 5
>>> 
>>> 应该是 select * 会把分区字段一起带出来吧,你字段就不匹配了,select里加上你需要的字段吧
>>> 
>>> 祝好,
>>> Leonard Xu
>>> 
 在 2020年5月28日,12:57,Zhou Zach  写道:
 
 org.apache.flink.client.program.ProgramInvocationException: The main 
 method caused an error: Field types of query result and registered 
 TableSink dwdCatalog.dwd.t1_copy do not match.
 
 Query schema: [id: BIGINT, name: STRING, p_year: INT, p_month: INT, 
 EXPR$4: INT NOT NULL, EXPR$5: INT NOT NULL]
 
 Sink schema: [id: BIGINT, name: STRING, p_year: INT, p_month: INT]
 
 at 
 org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
 
 at 
 org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
 
 at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
 
 at 
 org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
 
 at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
 
 at 
 org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
 
 at 
 org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
 
 at java.security.AccessController.doPrivileged(Native Method)
 
 at javax.security.auth.Subject.doAs(Subject.java:422)
 
 at 
 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
 
 at 
 org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
 
 at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
 
 
 
 
 

Re: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

2020-05-28 Thread Benchao Li
如果你的tEnv用的是TableEnvironment,而不是StreamTableEnvironment的话,两个insert会公用前面的source,也就是会只读取a表一次,然后分别给下游c和d用。

wind.fly@outlook.com  于2020年5月28日周四 下午3:14写道:

> Hi,all:
> 当前使用版本flink 1.10.0,使用blink planner,假如有如下代码:
>
> tEnv.createTemporaryView("b", tEnv.sqlQuery("select * from a"));
> tEnv.sqlUpdate("insert into c select * from b where b.con1 = '1'");
> tEnv.sqlUpdate("insert into d select * from b where b.con1 = '2'");
>
> 其中a是kafka表,connector属性为:
> 'connector.properties.group.id' = 'testGroup',
> 'connector.startup-mode' = 'group-offsets'
>
>疑问是该应用运行时c、d消费a表,a表group 'testGroup'
> offset的提交会不会受到影响,还是a表中同一条记录会流向2个不同的分支?
>


-- 

Best,
Benchao Li


Re: 关于kubernetes native配置的疑问

2020-05-28 Thread Yangze Guo
您好,我来回来一下第一个大问题

1. resources.requests.cpu和resources.limits.cpu都会被设置为kubernetes.jobmanager.cpu
2. external-resource..kubernetes.config-key
是为1.11的新特性扩展资源框架[1]而加入的。请不要使用它来配置cpu和memory。


[1] https://issues.apache.org/jira/browse/FLINK-17044

Best,
Yangze Guo

On Thu, May 28, 2020 at 3:48 PM  wrote:
>
>
>
> hi all
>
> 我在使用native kubernetes的时候,对几个配置项有疑问,想得到解答。
>
> 1. kubernetes.jobmanager.cpu配置项针对一个TM配置多少个cpu资源,是否在resources.requests.cpu 或者 
> resources.limits.cpu也做了配置?
> 在https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#kubernetes上看到对应的一个新的配置external-resource..kubernetes.config-key。
> 对external-resource..kubernetes.config-key和kubernetes.jobmanager.cpu这两个参数的作用有点疑问,如何配置才是对TM的cpu使用加上了限制。
>
>
> 2. 
> 大部分作业使用rocksdb状态后台,会把状态的文件写到固盘,在kubernetes中挂载到hostPath。如果是native,应该如何实现磁盘的挂载呢。
>
> Looking forward to your reply and help.
>
> Best
>
> | |
> a511955993
> |
> |
> 邮箱:a511955...@163.com
> |
>
> 签名由 网易邮箱大师 定制


回复:recursive.file.enumeration使用问题

2020-05-28 Thread 阿华田
说明一下 读取的数据还没有到今天的数据 也就是提示文件不存在的目录xxx/ds=2020-05-28


| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制


在2020年05月28日 16:36,阿华田 写道:
使用recursive.file.enumeration开启递归读取hdfs的目录文件,但是每次数据没有读完就会报如下错误:


java.io.IOException: Error opening the InputSplit 
hdfs://xxx/ds=2020-05-28/hour=15/2020-05-28_15.log.flume2_idcfeature_kafkamq.tmp
 [0,134217728]: File does not exist: /xxx/ds=2020-05-28/hour=15/
| |
王志华
|
|
a15733178...@163.com
|
签名由网易邮箱大师定制



回复: flink-sql watermark问题

2020-05-28 Thread 112039...@qq.com
w_ts as TO_TIMESTAMP(FROM_UNIXTIME(rowtime/1000),'-MM-dd 
HH:mm:ss'),这个语句产生的就是一个timestamp的数据Flink内置函数:https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/table/functions/systemFunctions.htmlFROM_UNIXTIME(numeric[,
 string]):
Returns a representation of the numeric argument as a value in string format 
(default is '-MM-DD hh:mm:ss'). numeric is an internal timestamp value 
representing seconds since '1970-01-01 00:00:00' UTC, such as produced by the 
UNIX_TIMESTAMP() function. The return value is expressed in the session time 
zone (specified in TableConfig).
E.g., FROM_UNIXTIME(44) returns '1970-01-01 09:00:44' if in UTC time zone, but 
returns '1970-01-01 09:00:44' if in 'Asia/Tokyo' time zone.
Only supported in blink planner.
TO_TIMESTAMP(string1[, string2]):
Converts date time string string1 with format string2 (by default: '-MM-dd 
HH:mm:ss') under the session time zone (specified by TableConfig) to a 
timestamp.
Only supported in blink planner.



112039...@qq.com
 
发件人: guaishushu1...@163.com
发送时间: 2020-05-28 16:22
收件人: user-zh
主题: flink-sql watermark问题
flink-1.10 sql只支持 timestamp(3) 类型字段生成watermark
但是long这样转换后也可以生成watermark很奇怪?
CREATE TABLE user_log (
response_size int,
rowtime BIGINT,
w_ts as TO_TIMESTAMP(FROM_UNIXTIME(rowtime/1000),'-MM-dd HH:mm:ss'),
WATERMARK FOR w_ts AS w_ts - INTERVAL '5' SECOND --5秒的延迟
)
 
 
 
guaishushu1...@163.com


flink写入hbase 报错如下 还会导致反压 任务变慢

2020-05-28 Thread air23



2020-05-28 16:54:23,867 INFO  
org.apache.hadoop.hbase.client.AsyncRequestFutureImpl - id=2, 
table=GC_SCHEM:mon1, attempt=7/16, failureCount=427ops, last 
exception=org.apache.hadoop.hbase.RegionTooBusyException: 
org.apache.hadoop.hbase.RegionTooBusyException: Above memstore limit, 
regionName=GC_SCHEM:mon1,,1590655288228.89343281db3d32d630482b536933121c., 
server=zongteng75,60020,1590565532547, memstoreSize=575455635, 
blockingMemStoreSize=536870912

  at 
org.apache.hadoop.hbase.regionserver.HRegion.checkResources(HRegion.java:3777)

  at org.apache.hadoop.hbase.regionserver.HRegion.batchMutate(HRegion.java:2935)

  at org.apache.hadoop.hbase.regionserver.HRegion.batchMutate(HRegion.java:2886)

  at 
org.apache.hadoop.hbase.regionserver.RSRpcServices.doBatchOp(RSRpcServices.java:765)

  at 
org.apache.hadoop.hbase.regionserver.RSRpcServices.doNonAtomicRegionMutation(RSRpcServices.java:716)

  at 
org.apache.hadoop.hbase.regionserver.RSRpcServices.multi(RSRpcServices.java:2146)

  at 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:33656)

  at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2191)

  at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112)

  at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:183)

  at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:163)

 on zongteng75,60020,1590565532547, tracking started null, retrying 
after=4037ms, operationsToReplay=427

回复: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

2020-05-28 Thread wind.fly....@outlook.com
Hi, Benchao:
谢谢回复,经过测试确实是这样,也就是同一个TableEnvironment下,如果有多条insert语句访问同一个表,该表的数据只消费一次?




Best,
Junbao Zhang

发件人: Benchao Li 
发送时间: 2020年5月28日 15:59
收件人: user-zh 
主题: Re: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

如果你的tEnv用的是TableEnvironment,而不是StreamTableEnvironment的话,两个insert会公用前面的source,也就是会只读取a表一次,然后分别给下游c和d用。

wind.fly@outlook.com  于2020年5月28日周四 下午3:14写道:

> Hi,all:
> 当前使用版本flink 1.10.0,使用blink planner,假如有如下代码:
>
> tEnv.createTemporaryView("b", tEnv.sqlQuery("select * from a"));
> tEnv.sqlUpdate("insert into c select * from b where b.con1 = '1'");
> tEnv.sqlUpdate("insert into d select * from b where b.con1 = '2'");
>
> 其中a是kafka表,connector属性为:
> 'connector.properties.group.id' = 'testGroup',
> 'connector.startup-mode' = 'group-offsets'
>
>疑问是该应用运行时c、d消费a表,a表group 'testGroup'
> offset的提交会不会受到影响,还是a表中同一条记录会流向2个不同的分支?
>


--

Best,
Benchao Li


Re: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

2020-05-28 Thread zhisheng
Hi,Benchao

http://zhisheng-blog.oss-cn-hangzhou.aliyuncs.com/2020-05-28-093940.jpg

这张图里面说的 TableEnvironment 不支持 UDAF/UDTF,那么如果想要用的话暂时有什么解决方法吗?社区大概什么时候会支持?

Thanks!

Benchao Li  于2020年5月28日周四 下午5:35写道:

> Hi,
>
> 时间差了13个小时这个比较奇怪,我理解不应该出现这个问题的。你是说不用TableEnvironment就不会出现这个问题么?
>
> 第二个问题,TableEnvironment目前应该是没法设置checkpoint的,这个应该是只能在StreamExecutionEnvironment来配置吧。
>
> wind.fly@outlook.com  于2020年5月28日周四
> 下午5:27写道:
>
> > Hi, Benchao:
> >
> >
> DAG图是指向了同一个问题,但是用了TableEnvironment后,发现所有的时间都差了13个小时,比如调用LOCALTIMESTAMP函数。另外问一个问题,使用TableEnvironment怎么设置checkpoint?
> >
> >
> >
> >
> > Best,
> > Junbao Zhang
> > 
> > 发件人: Benchao Li 
> > 发送时间: 2020年5月28日 17:05
> > 收件人: user-zh 
> > 主题: Re: 疑问:flink sql
> 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交
> >
> > 嗯,是的。这个是blink planner特有的优化,可以支持多个sink的DAG图的优化。你也可以看下Flink
> > UI上的任务DAG图,比较明显的可以看到只有一个Source,后面接了两个Sink。
> >
> > wind.fly@outlook.com  于2020年5月28日周四
> > 下午5:02写道:
> >
> > > Hi, Benchao:
> > > 谢谢回复,经过测试确实是这样,也就是同一个TableEnvironment下,如果有多条insert语句访问同一个表,该表的数据只消费一次?
> > >
> > >
> > >
> > >
> > > Best,
> > > Junbao Zhang
> > > 
> > > 发件人: Benchao Li 
> > > 发送时间: 2020年5月28日 15:59
> > > 收件人: user-zh 
> > > 主题: Re: 疑问:flink sql
> > 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交
> > >
> > >
> > >
> >
> 如果你的tEnv用的是TableEnvironment,而不是StreamTableEnvironment的话,两个insert会公用前面的source,也就是会只读取a表一次,然后分别给下游c和d用。
> > >
> > > wind.fly@outlook.com  于2020年5月28日周四
> > > 下午3:14写道:
> > >
> > > > Hi,all:
> > > > 当前使用版本flink 1.10.0,使用blink planner,假如有如下代码:
> > > >
> > > > tEnv.createTemporaryView("b", tEnv.sqlQuery("select * from a"));
> > > > tEnv.sqlUpdate("insert into c select * from b where b.con1 = '1'");
> > > > tEnv.sqlUpdate("insert into d select * from b where b.con1 = '2'");
> > > >
> > > > 其中a是kafka表,connector属性为:
> > > > 'connector.properties.group.id' = 'testGroup',
> > > > 'connector.startup-mode' = 'group-offsets'
> > > >
> > > >疑问是该应用运行时c、d消费a表,a表group 'testGroup'
> > > > offset的提交会不会受到影响,还是a表中同一条记录会流向2个不同的分支?
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>
>
> --
>
> Best,
> Benchao Li
>


回复:关于kubernetes native配置的疑问

2020-05-28 Thread a511955993
感谢两位大佬的回复,期待native kubernetes更多的特性出现




| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年05月28日 17:39,Yang Wang 写道:
您好,

目前native方式还没有办法来挂载volume,包括hostpath、persistent volume等
这个目前已经有了JIRA ticket[1],但是还没有开始做

如果你感兴趣,可以参与进来一起

[1]. https://issues.apache.org/jira/browse/FLINK-15649

Best,
Yang

Yangze Guo  于2020年5月28日周四 下午4:11写道:

> 您好,我来回来一下第一个大问题
>
> 1.
> resources.requests.cpu和resources.limits.cpu都会被设置为kubernetes.jobmanager.cpu
> 2. external-resource..kubernetes.config-key
> 是为1.11的新特性扩展资源框架[1]而加入的。请不要使用它来配置cpu和memory。
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-17044
>
> Best,
> Yangze Guo
>
> On Thu, May 28, 2020 at 3:48 PM  wrote:
> >
> >
> >
> > hi all
> >
> > 我在使用native kubernetes的时候,对几个配置项有疑问,想得到解答。
> >
> > 1.
> kubernetes.jobmanager.cpu配置项针对一个TM配置多少个cpu资源,是否在resources.requests.cpu 或者
> resources.limits.cpu也做了配置?
> > 在
> https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#kubernetes
> 上看到对应的一个新的配置external-resource..kubernetes.config-key。
> >
> 对external-resource..kubernetes.config-key和kubernetes.jobmanager.cpu这两个参数的作用有点疑问,如何配置才是对TM的cpu使用加上了限制。
> >
> >
> > 2.
> 大部分作业使用rocksdb状态后台,会把状态的文件写到固盘,在kubernetes中挂载到hostPath。如果是native,应该如何实现磁盘的挂载呢。
> >
> > Looking forward to your reply and help.
> >
> > Best
> >
> > | |
> > a511955993
> > |
> > |
> > 邮箱:a511955...@163.com
> > |
> >
> > 签名由 网易邮箱大师 定制
>


Re: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

2020-05-28 Thread Benchao Li
Hi zhisheng,

这块内容在FLIP-84[1]里面做了大量的重构和优化,而且大部分工作在1.11已经完成了,你们可以看下。

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878

wind.fly@outlook.com  于2020年5月28日周四 下午5:45写道:

> Hi,
>
> StreamTableEnvironment类似于TableEnvironment的Dag优化有版本规划吗?未来TableEnvironment的dag优化和StreamTableEnvironment较丰富api会不会统一?
>
> Best,
> Junbao Zhang
> 
> 发件人: Benchao Li 
> 发送时间: 2020年5月28日 17:35
> 收件人: user-zh 
> 主题: Re: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交
>
> Hi,
>
> 时间差了13个小时这个比较奇怪,我理解不应该出现这个问题的。你是说不用TableEnvironment就不会出现这个问题么?
>
> 第二个问题,TableEnvironment目前应该是没法设置checkpoint的,这个应该是只能在StreamExecutionEnvironment来配置吧。
>
> wind.fly@outlook.com  于2020年5月28日周四
> 下午5:27写道:
>
> > Hi, Benchao:
> >
> >
> DAG图是指向了同一个问题,但是用了TableEnvironment后,发现所有的时间都差了13个小时,比如调用LOCALTIMESTAMP函数。另外问一个问题,使用TableEnvironment怎么设置checkpoint?
> >
> >
> >
> >
> > Best,
> > Junbao Zhang
> > 
> > 发件人: Benchao Li 
> > 发送时间: 2020年5月28日 17:05
> > 收件人: user-zh 
> > 主题: Re: 疑问:flink sql
> 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交
> >
> > 嗯,是的。这个是blink planner特有的优化,可以支持多个sink的DAG图的优化。你也可以看下Flink
> > UI上的任务DAG图,比较明显的可以看到只有一个Source,后面接了两个Sink。
> >
> > wind.fly@outlook.com  于2020年5月28日周四
> > 下午5:02写道:
> >
> > > Hi, Benchao:
> > > 谢谢回复,经过测试确实是这样,也就是同一个TableEnvironment下,如果有多条insert语句访问同一个表,该表的数据只消费一次?
> > >
> > >
> > >
> > >
> > > Best,
> > > Junbao Zhang
> > > 
> > > 发件人: Benchao Li 
> > > 发送时间: 2020年5月28日 15:59
> > > 收件人: user-zh 
> > > 主题: Re: 疑问:flink sql
> > 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交
> > >
> > >
> > >
> >
> 如果你的tEnv用的是TableEnvironment,而不是StreamTableEnvironment的话,两个insert会公用前面的source,也就是会只读取a表一次,然后分别给下游c和d用。
> > >
> > > wind.fly@outlook.com  于2020年5月28日周四
> > > 下午3:14写道:
> > >
> > > > Hi,all:
> > > > 当前使用版本flink 1.10.0,使用blink planner,假如有如下代码:
> > > >
> > > > tEnv.createTemporaryView("b", tEnv.sqlQuery("select * from a"));
> > > > tEnv.sqlUpdate("insert into c select * from b where b.con1 = '1'");
> > > > tEnv.sqlUpdate("insert into d select * from b where b.con1 = '2'");
> > > >
> > > > 其中a是kafka表,connector属性为:
> > > > 'connector.properties.group.id' = 'testGroup',
> > > > 'connector.startup-mode' = 'group-offsets'
> > > >
> > > >疑问是该应用运行时c、d消费a表,a表group 'testGroup'
> > > > offset的提交会不会受到影响,还是a表中同一条记录会流向2个不同的分支?
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>
>
> --
>
> Best,
> Benchao Li
>


-- 

Best,
Benchao Li


Re: flink-sql watermark问题

2020-05-28 Thread Benchao Li
Hi,

没太看明白你的问题是什么。目前的确是只支持Timestmap(3)作为事件时间列。
之所以还不支持long作为事件时间列,主要考虑的是时区问题。但是这个社区也在考虑,可以参考[1]

[1] https://issues.apache.org/jira/browse/FLINK-16938

guaishushu1...@163.com  于2020年5月28日周四 下午4:22写道:

> flink-1.10 sql只支持 timestamp(3) 类型字段生成watermark
> 但是long这样转换后也可以生成watermark很奇怪?
> CREATE TABLE user_log (
> response_size int,
> rowtime BIGINT,
> w_ts as TO_TIMESTAMP(FROM_UNIXTIME(rowtime/1000),'-MM-dd HH:mm:ss'),
> WATERMARK FOR w_ts AS w_ts - INTERVAL '5' SECOND --5秒的延迟
> )
>
>
>
> guaishushu1...@163.com
>


-- 

Best,
Benchao Li


Re: 关于kubernetes native配置的疑问

2020-05-28 Thread Yang Wang
您好,

目前native方式还没有办法来挂载volume,包括hostpath、persistent volume等
这个目前已经有了JIRA ticket[1],但是还没有开始做

如果你感兴趣,可以参与进来一起

[1]. https://issues.apache.org/jira/browse/FLINK-15649

Best,
Yang

Yangze Guo  于2020年5月28日周四 下午4:11写道:

> 您好,我来回来一下第一个大问题
>
> 1.
> resources.requests.cpu和resources.limits.cpu都会被设置为kubernetes.jobmanager.cpu
> 2. external-resource..kubernetes.config-key
> 是为1.11的新特性扩展资源框架[1]而加入的。请不要使用它来配置cpu和memory。
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-17044
>
> Best,
> Yangze Guo
>
> On Thu, May 28, 2020 at 3:48 PM  wrote:
> >
> >
> >
> > hi all
> >
> > 我在使用native kubernetes的时候,对几个配置项有疑问,想得到解答。
> >
> > 1.
> kubernetes.jobmanager.cpu配置项针对一个TM配置多少个cpu资源,是否在resources.requests.cpu 或者
> resources.limits.cpu也做了配置?
> > 在
> https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#kubernetes
> 上看到对应的一个新的配置external-resource..kubernetes.config-key。
> >
> 对external-resource..kubernetes.config-key和kubernetes.jobmanager.cpu这两个参数的作用有点疑问,如何配置才是对TM的cpu使用加上了限制。
> >
> >
> > 2.
> 大部分作业使用rocksdb状态后台,会把状态的文件写到固盘,在kubernetes中挂载到hostPath。如果是native,应该如何实现磁盘的挂载呢。
> >
> > Looking forward to your reply and help.
> >
> > Best
> >
> > | |
> > a511955993
> > |
> > |
> > 邮箱:a511955...@163.com
> > |
> >
> > 签名由 网易邮箱大师 定制
>


回复: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

2020-05-28 Thread wind.fly....@outlook.com
Hi,
StreamTableEnvironment类似于TableEnvironment的Dag优化有版本规划吗?未来TableEnvironment的dag优化和StreamTableEnvironment较丰富api会不会统一?

Best,
Junbao Zhang

发件人: Benchao Li 
发送时间: 2020年5月28日 17:35
收件人: user-zh 
主题: Re: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

Hi,

时间差了13个小时这个比较奇怪,我理解不应该出现这个问题的。你是说不用TableEnvironment就不会出现这个问题么?
第二个问题,TableEnvironment目前应该是没法设置checkpoint的,这个应该是只能在StreamExecutionEnvironment来配置吧。

wind.fly@outlook.com  于2020年5月28日周四 下午5:27写道:

> Hi, Benchao:
>
> DAG图是指向了同一个问题,但是用了TableEnvironment后,发现所有的时间都差了13个小时,比如调用LOCALTIMESTAMP函数。另外问一个问题,使用TableEnvironment怎么设置checkpoint?
>
>
>
>
> Best,
> Junbao Zhang
> 
> 发件人: Benchao Li 
> 发送时间: 2020年5月28日 17:05
> 收件人: user-zh 
> 主题: Re: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交
>
> 嗯,是的。这个是blink planner特有的优化,可以支持多个sink的DAG图的优化。你也可以看下Flink
> UI上的任务DAG图,比较明显的可以看到只有一个Source,后面接了两个Sink。
>
> wind.fly@outlook.com  于2020年5月28日周四
> 下午5:02写道:
>
> > Hi, Benchao:
> > 谢谢回复,经过测试确实是这样,也就是同一个TableEnvironment下,如果有多条insert语句访问同一个表,该表的数据只消费一次?
> >
> >
> >
> >
> > Best,
> > Junbao Zhang
> > 
> > 发件人: Benchao Li 
> > 发送时间: 2020年5月28日 15:59
> > 收件人: user-zh 
> > 主题: Re: 疑问:flink sql
> 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交
> >
> >
> >
> 如果你的tEnv用的是TableEnvironment,而不是StreamTableEnvironment的话,两个insert会公用前面的source,也就是会只读取a表一次,然后分别给下游c和d用。
> >
> > wind.fly@outlook.com  于2020年5月28日周四
> > 下午3:14写道:
> >
> > > Hi,all:
> > > 当前使用版本flink 1.10.0,使用blink planner,假如有如下代码:
> > >
> > > tEnv.createTemporaryView("b", tEnv.sqlQuery("select * from a"));
> > > tEnv.sqlUpdate("insert into c select * from b where b.con1 = '1'");
> > > tEnv.sqlUpdate("insert into d select * from b where b.con1 = '2'");
> > >
> > > 其中a是kafka表,connector属性为:
> > > 'connector.properties.group.id' = 'testGroup',
> > > 'connector.startup-mode' = 'group-offsets'
> > >
> > >疑问是该应用运行时c、d消费a表,a表group 'testGroup'
> > > offset的提交会不会受到影响,还是a表中同一条记录会流向2个不同的分支?
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>
>
> --
>
> Best,
> Benchao Li
>


--

Best,
Benchao Li


关于kubernetes native配置的疑问

2020-05-28 Thread a511955993


hi all

我在使用native kubernetes的时候,对几个配置项有疑问,想得到解答。

1. kubernetes.jobmanager.cpu配置项针对一个TM配置多少个cpu资源,是否在resources.requests.cpu 或者 
resources.limits.cpu也做了配置?
在https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#kubernetes上看到对应的一个新的配置external-resource..kubernetes.config-key。
对external-resource..kubernetes.config-key和kubernetes.jobmanager.cpu这两个参数的作用有点疑问,如何配置才是对TM的cpu使用加上了限制。


2. 
大部分作业使用rocksdb状态后台,会把状态的文件写到固盘,在kubernetes中挂载到hostPath。如果是native,应该如何实现磁盘的挂载呢。

Looking forward to your reply and help.

Best

| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

Re: flink写入hbase 报错如下 还会导致反压 任务变慢

2020-05-28 Thread Leonard Xu
Hi

> org.apache.hadoop.hbase.RegionTooBusyException

这异常信息看起来像hbase集群在大量写入时碰到了问题,不像是flink这边的问题,建议排查下hbase侧,应该有一些参数可以优化。

Best,
Leonard Xu

Re:Re: flink-sql watermark问题

2020-05-28 Thread 程龙



可以先在之前对于long类型的字段转换成Timestmap 类型 再生成watermark














在 2020-05-28 17:00:53,"Benchao Li"  写道:
>Hi,
>
>没太看明白你的问题是什么。目前的确是只支持Timestmap(3)作为事件时间列。
>之所以还不支持long作为事件时间列,主要考虑的是时区问题。但是这个社区也在考虑,可以参考[1]
>
>[1] https://issues.apache.org/jira/browse/FLINK-16938
>
>guaishushu1...@163.com  于2020年5月28日周四 下午4:22写道:
>
>> flink-1.10 sql只支持 timestamp(3) 类型字段生成watermark
>> 但是long这样转换后也可以生成watermark很奇怪?
>> CREATE TABLE user_log (
>> response_size int,
>> rowtime BIGINT,
>> w_ts as TO_TIMESTAMP(FROM_UNIXTIME(rowtime/1000),'-MM-dd HH:mm:ss'),
>> WATERMARK FOR w_ts AS w_ts - INTERVAL '5' SECOND --5秒的延迟
>> )
>>
>>
>>
>> guaishushu1...@163.com
>>
>
>
>-- 
>
>Best,
>Benchao Li


Re:flink写入hbase 报错如下 还会导致反压 任务变慢

2020-05-28 Thread 程龙






这不是flink的问题,之前遇到过相关问题 hbase region分裂的时候 会出现此类问题  你可以看看日志是否hbase region当时正在分裂,  










在 2020-05-28 16:57:35,"air23"  写道:




2020-05-28 16:54:23,867 INFO  
org.apache.hadoop.hbase.client.AsyncRequestFutureImpl - id=2, 
table=GC_SCHEM:mon1, attempt=7/16, failureCount=427ops, last 
exception=org.apache.hadoop.hbase.RegionTooBusyException: 
org.apache.hadoop.hbase.RegionTooBusyException: Above memstore limit, 
regionName=GC_SCHEM:mon1,,1590655288228.89343281db3d32d630482b536933121c., 
server=zongteng75,60020,1590565532547, memstoreSize=575455635, 
blockingMemStoreSize=536870912

  at 
org.apache.hadoop.hbase.regionserver.HRegion.checkResources(HRegion.java:3777)

  at org.apache.hadoop.hbase.regionserver.HRegion.batchMutate(HRegion.java:2935)

  at org.apache.hadoop.hbase.regionserver.HRegion.batchMutate(HRegion.java:2886)

  at 
org.apache.hadoop.hbase.regionserver.RSRpcServices.doBatchOp(RSRpcServices.java:765)

  at 
org.apache.hadoop.hbase.regionserver.RSRpcServices.doNonAtomicRegionMutation(RSRpcServices.java:716)

  at 
org.apache.hadoop.hbase.regionserver.RSRpcServices.multi(RSRpcServices.java:2146)

  at 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:33656)

  at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2191)

  at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112)

  at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:183)

  at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:163)

 on zongteng75,60020,1590565532547, tracking started null, retrying 
after=4037ms, operationsToReplay=427





 

FlinkKafkaConsumer消费kafka中avro格式的消息,直接用AvroDeserializationSchema总是运行失败,自己实现了一个AbstracteserializationSchema运行成功了。

2020-05-28 Thread hyangvv
我使用avro代码生成工具基于一个schema生成了UserView类,又实现了kafka的Serializer接口,代码如下:
import org.apache.kafka.common.serialization.Serializer;

import java.io.IOException;

public class UserViewSerializer implements Serializer {
@Override
public byte[] serialize(String topic, UserView data) {
byte[] array = null;
try {
array = data.toByteBuffer().array();
} catch (IOException e) {
e.printStackTrace();
}
return array;
}
}
构造kafka的生产者,将UserView实例写入kafka队列,代码如下:
KafkaProducer producer = new KafkaProducer<>(props, new 
StringSerializer(), new UserViewSerializer());
在Flink程序中用FlinkKafkaConsumer消费kafka队列中的avro格式的消息,就出现了主题中描述的问题,出现异常的代码如下:
FlinkKafkaConsumer myConsumer = new 
FlinkKafkaConsumer<>("UserView", AvroDeserializationSchema.forGeneric(SCHEMA), 
properties);
导致运行失败的异常信息如下:

Caused by: java.io.EOFException
at org.apache.avro.io.BinaryDecoder$InputStreamByteSource.readRaw 
(BinaryDecoder.java:827)
at org.apache.avro.io.BinaryDecoder.doReadBytes (BinaryDecoder.java:349)
at org.apache.avro.io.BinaryDecoder.readString (BinaryDecoder.java:263)
at org.apache.avro.io.ResolvingDecoder.readString 
(ResolvingDecoder.java:201)
at org.apache.avro.generic.GenericDatumReader.readString 
(GenericDatumReader.java:422)
at org.apache.avro.generic.GenericDatumReader.readString 
(GenericDatumReader.java:414)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion 
(GenericDatumReader.java:181)
at org.apache.avro.generic.GenericDatumReader.read 
(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.readField 
(GenericDatumReader.java:232)
at org.apache.avro.generic.GenericDatumReader.readRecord 
(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion 
(GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read 
(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read 
(GenericDatumReader.java:145)
at org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize 
(AvroDeserializationSchema.java:135)
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize
 (KafkaDeserializationSchemaWrapper.java:45)
at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop 
(KafkaFetcher.java:140)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run 
(FlinkKafkaConsumerBase.java:718)
at org.apache.flink.streaming.api.operators.StreamSource.run 
(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run 
(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run
 (SourceStreamTask.java:200)

希望大神不吝赐教。



Re: flink 如何自定义connector

2020-05-28 Thread Peihui He
hello

   正在尝试中,感谢解答珞

best wishes

111  于2020年5月28日周四 上午10:16写道:

> Hi,
> 想要在sqlgateway里面使用,那么可以看看下面几个条件:
> 1 满足SPI的要求,能让flink自动发现实现类
> 2 配置FLINK_HOME环境变量,自定义的connector放在FLINK_HOME/lib下
> 3 如果与Hive集成,使用hivecatalog,那么先要注册表
> 这样就可以使用了。
> Best,
> Xinghalo


Re:Re: 回复:flink 1.9 1.10 on yarn在cdh上怎么搭建一个客户端

2020-05-28 Thread air23












可以看下你的HADOOP_CONF吗。我的配置的=/etc/hadoop/conf。
开源的Hadoop版本 这个也放了





在 2020-05-28 09:36:10,"wangweigu...@stevegame.cn"  
写道:
>
>确实,你只要配置好CDH的HADOOP_CONF环境变量,同时下载开源的Hadoop版本(和CDH版本相同)放到flink 
> lib下,就可以访问CDH yarn,提交作业!
>
>目前我这边是CDH 5.16.1,Flink 1.10,提交Flink on yarn是没问题,任务运行也没问题,还可以使用Flink on 
> hive!
>
>flink-shaded-hadoop-2-uber-2.6.5-10.0.jar
>
>
>
> 
>发件人: 111
>发送时间: 2020-05-28 09:13
>收件人: user-zh@flink.apache.org
>主题: 回复:flink 1.9 1.10 on yarn在cdh上怎么搭建一个客户端
>Hi,
>一般只要你有yarn环境,在任意一台机器上下载flink安装包,配一下HADOOP_CONF环境变量就可以使用。
>如果是session模式:可以使用Yarn-session.sh启动yarn session集群,然后通过flink run xxx 提交程序。
>如果是per job模式:直接使用flink run即可。
>best,
>Xinghalo


flink1.10 on yarn 问题

2020-05-28 Thread air23
cdh运行flink1.10 on cdh yarn 报错如下。 用1.7.2版本就没有问题
flink-shaded-hadoop-2-uber-2.6.5-10.0.jar 也加了 
hadoop环境变量 export HADOOP_CONF_DIR=/etc/hadoop/conf 
求解答







org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: org.apache.flink.client.program.ProgramInvocationException: 
Job failed (JobID: e358699c1be6be1472078771e1fd027f)

at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)

at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)

at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)

at 
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)

at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)

at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)

at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)

Caused by: java.util.concurrent.ExecutionException: 
org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 
e358699c1be6be1472078771e1fd027f)

at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)

at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)

at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:83)

at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)

at tt.WordCountStreamingByJava.main(WordCountStreamingByJava.java:36)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)

... 11 more

Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed (JobID: e358699c1be6be1472078771e1fd027f)

at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)

at 
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)

at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)

at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)

at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)

at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync$21(RestClusterClient.java:565)

at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)

at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)

at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)

at 
java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)

at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:291)

at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)

at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)

at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)

at 
java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)

at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)

at 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)

at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.

at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)

at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)

... 19 more

Caused by: 

Re: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

2020-05-28 Thread Benchao Li
我的理解是这样的。
TableEnvironment的api主要是跟table相关的概念,这里面并没有流(DataStream)和批(DataSet)的概念。
StreamTableEnvironment是继承了TableEnvironment,但是多了跟DataStream交互的接口,比如把DataStream转成Table,以及反过来转换等。
BatchTableEnvironment也是同理,添加了跟DataSet打交道的一些接口。

LakeShen  于2020年5月29日周五 上午10:16写道:

> Hi Benchao,
>
> TableEnvironment 和 StreamTableEnvironment 具体有什么差异吗,我看StreamTableEnvironment
> 继承了 TableEnvironment。
>
> 这块我不是很了解,有什么文档介绍吗,感谢。
>
> Best,
> LakeShen
>
> Benchao Li  于2020年5月28日周四 下午5:52写道:
>
> > Hi zhisheng,
> >
> > 这块内容在FLIP-84[1]里面做了大量的重构和优化,而且大部分工作在1.11已经完成了,你们可以看下。
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
> >
> > wind.fly@outlook.com  于2020年5月28日周四
> > 下午5:45写道:
> >
> > > Hi,
> > >
> > >
> >
> StreamTableEnvironment类似于TableEnvironment的Dag优化有版本规划吗?未来TableEnvironment的dag优化和StreamTableEnvironment较丰富api会不会统一?
> > >
> > > Best,
> > > Junbao Zhang
> > > 
> > > 发件人: Benchao Li 
> > > 发送时间: 2020年5月28日 17:35
> > > 收件人: user-zh 
> > > 主题: Re: 疑问:flink sql
> > 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交
> > >
> > > Hi,
> > >
> > > 时间差了13个小时这个比较奇怪,我理解不应该出现这个问题的。你是说不用TableEnvironment就不会出现这个问题么?
> > >
> > >
> >
> 第二个问题,TableEnvironment目前应该是没法设置checkpoint的,这个应该是只能在StreamExecutionEnvironment来配置吧。
> > >
> > > wind.fly@outlook.com  于2020年5月28日周四
> > > 下午5:27写道:
> > >
> > > > Hi, Benchao:
> > > >
> > > >
> > >
> >
> DAG图是指向了同一个问题,但是用了TableEnvironment后,发现所有的时间都差了13个小时,比如调用LOCALTIMESTAMP函数。另外问一个问题,使用TableEnvironment怎么设置checkpoint?
> > > >
> > > >
> > > >
> > > >
> > > > Best,
> > > > Junbao Zhang
> > > > 
> > > > 发件人: Benchao Li 
> > > > 发送时间: 2020年5月28日 17:05
> > > > 收件人: user-zh 
> > > > 主题: Re: 疑问:flink sql
> > > 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交
> > > >
> > > > 嗯,是的。这个是blink planner特有的优化,可以支持多个sink的DAG图的优化。你也可以看下Flink
> > > > UI上的任务DAG图,比较明显的可以看到只有一个Source,后面接了两个Sink。
> > > >
> > > > wind.fly@outlook.com  于2020年5月28日周四
> > > > 下午5:02写道:
> > > >
> > > > > Hi, Benchao:
> > > > >
> > 谢谢回复,经过测试确实是这样,也就是同一个TableEnvironment下,如果有多条insert语句访问同一个表,该表的数据只消费一次?
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > Best,
> > > > > Junbao Zhang
> > > > > 
> > > > > 发件人: Benchao Li 
> > > > > 发送时间: 2020年5月28日 15:59
> > > > > 收件人: user-zh 
> > > > > 主题: Re: 疑问:flink sql
> > > > 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
> 如果你的tEnv用的是TableEnvironment,而不是StreamTableEnvironment的话,两个insert会公用前面的source,也就是会只读取a表一次,然后分别给下游c和d用。
> > > > >
> > > > > wind.fly@outlook.com  于2020年5月28日周四
> > > > > 下午3:14写道:
> > > > >
> > > > > > Hi,all:
> > > > > > 当前使用版本flink 1.10.0,使用blink planner,假如有如下代码:
> > > > > >
> > > > > > tEnv.createTemporaryView("b", tEnv.sqlQuery("select * from a"));
> > > > > > tEnv.sqlUpdate("insert into c select * from b where b.con1 =
> '1'");
> > > > > > tEnv.sqlUpdate("insert into d select * from b where b.con1 =
> '2'");
> > > > > >
> > > > > > 其中a是kafka表,connector属性为:
> > > > > > 'connector.properties.group.id' = 'testGroup',
> > > > > > 'connector.startup-mode' = 'group-offsets'
> > > > > >
> > > > > >疑问是该应用运行时c、d消费a表,a表group 'testGroup'
> > > > > > offset的提交会不会受到影响,还是a表中同一条记录会流向2个不同的分支?
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > >
> > > > > Best,
> > > > > Benchao Li
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Benchao Li
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>


-- 

Best,
Benchao Li


Re: native kubernetes在不同kubernetes版本构建失败问题

2020-05-28 Thread Yang Wang
更新一下这个问题的进展:


目前java 8u252的修复已经merge到了master和release-1.11分支,你可以用这两个
分支自己编译flink binary进行验证

另外,如果确实想在在1.10使用,可以设置环境变量HTTP2_DISABLE=true
Flink client端可以export HTTP2_DISABLE=true
JM/TM可以通过如下Flink参数设置,当然也可以直接在build镜像的时候设置

containerized.master.env.HTTP2_DISABLE=true
containerized.taskmanager.env.HTTP2_DISABLE=true


Best,
Yang

 于2020年5月27日周三 下午2:25写道:

>
> 感谢你的耐心解答~
>
>
>
> | |
> a511955993
> |
> |
> 邮箱:a511955...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年05月27日 14:17,Yang Wang 写道:
> 是的
>
> 不过后续我觉得也可以考虑把这个fix backport到1.10分支,在下一次发布的时候修复
>
> 目前master版本已经包含了这个fix,但是需要你自己来build镜像
>
>  于2020年5月27日周三 下午1:38写道:
>
> > hi,yang
> >
> > 使用的镜像是docker hub提供的1.10.1版本镜像。因此解法是
> > 1. 等待1.11版本
> > 2. 自行构建flink 1.10.1版本镜像,降低jdk版本?
> >
> > Looking forward to your reply and help.
> >
> > Best
> >
> >
> >
> >
> >
> > | |
> > a511955993
> > |
> > |
> > 邮箱:a511955...@163.com
> > |
> >
> > 签名由 网易邮箱大师 定制
> >
> > 在2020年05月27日 13:25,Yang Wang 写道:
> > "Broken pipe" 这个是fabric8的kubernetes-client的一个bug
> > 你镜像的jdk版本是java 8u252吧,目前Flink on K8s不能和java 8u252一起工作,
> > 解法是使用8u252以下的jdk版本或者升级到jdk11
> >
> > 在Flink 1.11里面会升级fabric8的kubernetes client依赖到最新版本来解决
> >
> >
> > Best,
> > Yang
> >
> >  于2020年5月27日周三 下午12:52写道:
> >
> > >
> > > 根据文档[1]进行配置,可以看到具体日志信息,启动指令如下:
> > >
> > > /usr/local/flink/flink-1.10.1/bin/kubernetes-session.sh \
> > >  -Dkubernetes.cluster-id=ipcode \
> > >  -Dkubernetes.jobmanager.service-account=flink \
> > >  -Dtaskmanager.memory.process.size=4096m \
> > >  -Dkubernetes.taskmanager.cpu=2 \
> > >  -Dtaskmanager.numberOfTaskSlots=4 \
> > >  -Dkubernetes.namespace=flink-ipcode \
> > >  -Dkubernetes.rest-service.exposed.type=NodePort \
> > >  -Dkubernetes.container-start-command-template="%java%
> > %classpath%
> > > %jvmmem% %jvmopts% %logging% %class% %args%" \
> > >  -Dakka.framesize=104857600b \
> > >  -Dkubernetes.container.image=flink:1.10.1
> > >
> > >
> > > 对应的service、deployment、ConfigMap都已经创建
> > >
> > >
> > > kubectl get svc -n flink-ipcode
> > > NAME  TYPECLUSTER-IP  EXTERNAL-IP   PORT(S)
> > >   AGE
> > > ipcodeClusterIP   x   
> > > 8081/TCP,6123/TCP,6124/TCP   21s
> > > ipcode-rest   NodePort8081:30803/TCP
> > >21s
> > >
> > >
> > > kubernetes版本 v1.17.4失败,V1.15.1成功。
> > >
> > > 1.17.4的jobmanager报错日志如下:
> > >
> > >
> > > 2020-05-27 04:37:44,225 ERROR
> > > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal
> > error
> > > occurred in the cluster entrypoint.
> > >
> >
> org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException:
> > > Could not start the ResourceManager
> akka.tcp://flink@ipcode.flink-ipcode
> > > :6123/user/resourcemanager
> > >  at
> > >
> >
> org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:193)
> > >  at
> > >
> >
> org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStart(RpcEndpoint.java:185)
> > >  at
> > >
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:549)
> > >  at
> > >
> >
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:170)
> > >  at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> > >  at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> > >  at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> > >  at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> > >  at akka.japi.pf
> .UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> > >  at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> > >  at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> > >  at akka.actor.Actor.aroundReceive(Actor.scala:517)
> > >  at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> > >  at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> > >  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> > >  at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> > >  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> > >  at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> > >  at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> > >  at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > >  at
> > >
> >
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > >  at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > >  at
> > >
> >
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > > Caused by: io.fabric8.kubernetes.client.KubernetesClientException:
> > > Operation: [list]  for kind: [Pod]  with name: [null]  in namespace:
> > > [flink-ipcode]  failed.
> > >  at
> > >
> >
> io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
> > >  at
> > >
> >
> 

Re: FlinkKafkaConsumer消费kafka中avro格式的消息,直接用AvroDeserializationSchema总是运行失败,自己实现了一个AbstracteserializationSchema运行成功了。

2020-05-28 Thread Leonard Xu
Hi, 
> 我使用avro代码生成工具基于一个schema生成了UserView类,又实现了kafka的Serializer接口,
你往kafka中写入avro格式的数据时用kafak的Serializer,写入的格式是当成confluent schemat registry的avro格式吧
confluent schemat registry 
在处理avro数据时会多写一个MAGIC_BYTE,一般avro是没有的,消费时用ConfluentRegistryAvroDeserializationSchema
 试试。


Best,
Leonard Xu
[1] 
https://issues.apache.org/jira/browse/FLINK-16048?focusedCommentId=17036670=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17036670
 



> 在 2020年5月29日,01:14,hyangvv  写道:
> 
> 我使用avro代码生成工具基于一个schema生成了UserView类,又实现了kafka的Serializer接口,代码如下:
> import org.apache.kafka.common.serialization.Serializer;
> 
> import java.io.IOException;
> 
> public class UserViewSerializer implements Serializer {
>@Override
>public byte[] serialize(String topic, UserView data) {
>byte[] array = null;
>try {
>array = data.toByteBuffer().array();
>} catch (IOException e) {
>e.printStackTrace();
>}
>return array;
>}
> }
> 构造kafka的生产者,将UserView实例写入kafka队列,代码如下:
> KafkaProducer producer = new KafkaProducer<>(props, new 
> StringSerializer(), new UserViewSerializer());
> 在Flink程序中用FlinkKafkaConsumer消费kafka队列中的avro格式的消息,就出现了主题中描述的问题,出现异常的代码如下:
> FlinkKafkaConsumer myConsumer = new 
> FlinkKafkaConsumer<>("UserView", 
> AvroDeserializationSchema.forGeneric(SCHEMA), properties);
> 导致运行失败的异常信息如下:
> 
> Caused by: java.io.EOFException
>at org.apache.avro.io.BinaryDecoder$InputStreamByteSource.readRaw 
> (BinaryDecoder.java:827)
>at org.apache.avro.io.BinaryDecoder.doReadBytes (BinaryDecoder.java:349)
>at org.apache.avro.io.BinaryDecoder.readString (BinaryDecoder.java:263)
>at org.apache.avro.io.ResolvingDecoder.readString 
> (ResolvingDecoder.java:201)
>at org.apache.avro.generic.GenericDatumReader.readString 
> (GenericDatumReader.java:422)
>at org.apache.avro.generic.GenericDatumReader.readString 
> (GenericDatumReader.java:414)
>at org.apache.avro.generic.GenericDatumReader.readWithoutConversion 
> (GenericDatumReader.java:181)
>at org.apache.avro.generic.GenericDatumReader.read 
> (GenericDatumReader.java:153)
>at org.apache.avro.generic.GenericDatumReader.readField 
> (GenericDatumReader.java:232)
>at org.apache.avro.generic.GenericDatumReader.readRecord 
> (GenericDatumReader.java:222)
>at org.apache.avro.generic.GenericDatumReader.readWithoutConversion 
> (GenericDatumReader.java:175)
>at org.apache.avro.generic.GenericDatumReader.read 
> (GenericDatumReader.java:153)
>at org.apache.avro.generic.GenericDatumReader.read 
> (GenericDatumReader.java:145)
>at org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize 
> (AvroDeserializationSchema.java:135)
>at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize
>  (KafkaDeserializationSchemaWrapper.java:45)
>at 
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop
>  (KafkaFetcher.java:140)
>at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run 
> (FlinkKafkaConsumerBase.java:718)
>at org.apache.flink.streaming.api.operators.StreamSource.run 
> (StreamSource.java:100)
>at org.apache.flink.streaming.api.operators.StreamSource.run 
> (StreamSource.java:63)
>at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run
>  (SourceStreamTask.java:200)
> 
> 希望大神不吝赐教。
> 



Re: Re: flink-sql watermark问题

2020-05-28 Thread guaishushu1...@163.com
而且 flink不是只支持这种"-MM-dd'T'HH:mm:ss.SSS'Z'" 类型解析为watermark吗,就对这样有点疑惑



guaishushu1...@163.com
 
发件人: guaishushu1...@163.com
发送时间: 2020-05-29 10:20
收件人: Benchao Li
抄送: user-zh
主题: Re: Re: flink-sql watermark问题

就是我是long类型的时间戳,但是用TO_TIMESTAMP转换成'-MM-dd HH:mm:ss' 之后依然可以生成watermark。


guaishushu1...@163.com
 
发件人: Benchao Li
发送时间: 2020-05-28 17:00
收件人: user-zh
主题: Re: flink-sql watermark问题
Hi,
 
没太看明白你的问题是什么。目前的确是只支持Timestmap(3)作为事件时间列。
之所以还不支持long作为事件时间列,主要考虑的是时区问题。但是这个社区也在考虑,可以参考[1]
 
[1] https://issues.apache.org/jira/browse/FLINK-16938
 
guaishushu1...@163.com  于2020年5月28日周四 下午4:22写道:
 
> flink-1.10 sql只支持 timestamp(3) 类型字段生成watermark
> 但是long这样转换后也可以生成watermark很奇怪?
> CREATE TABLE user_log (
> response_size int,
> rowtime BIGINT,
> w_ts as TO_TIMESTAMP(FROM_UNIXTIME(rowtime/1000),'-MM-dd HH:mm:ss'),
> WATERMARK FOR w_ts AS w_ts - INTERVAL '5' SECOND --5秒的延迟
> )
>
>
>
> guaishushu1...@163.com
>
 
 
-- 
 
Best,
Benchao Li


Re: Re: flink-sql watermark问题

2020-05-28 Thread Benchao Li
Flink支持把Timestamp(3)这种类型声明声明为事件时间列,并且为它生成watermark。
你上面提到的"-MM-dd'T'HH:mm:ss.SSS'Z'",并不是一种数据类型,它只是Timestamp的一种string表达形式,这个主要是在json
format里面把一个字符串解析为timestamp类型的时候需要的一种格式。

所以如果你有其他类型的字段,比如varchar、long、int等等,都可以通过内置函数或者udf将其转成timestamp(3)的类型,再在此基础上做watermark生成。

guaishushu1...@163.com  于2020年5月29日周五 上午10:25写道:

> 而且 flink不是只支持这种"-MM-dd'T'HH:mm:ss.SSS'Z'" 类型解析为watermark吗,就对这样有点疑惑
>
>
>
> guaishushu1...@163.com
>
> 发件人: guaishushu1...@163.com
> 发送时间: 2020-05-29 10:20
> 收件人: Benchao Li
> 抄送: user-zh
> 主题: Re: Re: flink-sql watermark问题
>
> 就是我是long类型的时间戳,但是用TO_TIMESTAMP转换成'-MM-dd HH:mm:ss' 之后依然可以生成watermark。
>
>
> guaishushu1...@163.com
>
> 发件人: Benchao Li
> 发送时间: 2020-05-28 17:00
> 收件人: user-zh
> 主题: Re: flink-sql watermark问题
> Hi,
>
> 没太看明白你的问题是什么。目前的确是只支持Timestmap(3)作为事件时间列。
> 之所以还不支持long作为事件时间列,主要考虑的是时区问题。但是这个社区也在考虑,可以参考[1]
>
> [1] https://issues.apache.org/jira/browse/FLINK-16938
>
> guaishushu1...@163.com  于2020年5月28日周四 下午4:22写道:
>
> > flink-1.10 sql只支持 timestamp(3) 类型字段生成watermark
> > 但是long这样转换后也可以生成watermark很奇怪?
> > CREATE TABLE user_log (
> > response_size int,
> > rowtime BIGINT,
> > w_ts as TO_TIMESTAMP(FROM_UNIXTIME(rowtime/1000),'-MM-dd HH:mm:ss'),
> > WATERMARK FOR w_ts AS w_ts - INTERVAL '5' SECOND --5秒的延迟
> > )
> >
> >
> >
> > guaishushu1...@163.com
> >
>
>
> --
>
> Best,
> Benchao Li
>


-- 

Best,
Benchao Li


Re: 疑问:flink sql 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交

2020-05-28 Thread LakeShen
Hi Benchao,

TableEnvironment 和 StreamTableEnvironment 具体有什么差异吗,我看StreamTableEnvironment
继承了 TableEnvironment。

这块我不是很了解,有什么文档介绍吗,感谢。

Best,
LakeShen

Benchao Li  于2020年5月28日周四 下午5:52写道:

> Hi zhisheng,
>
> 这块内容在FLIP-84[1]里面做了大量的重构和优化,而且大部分工作在1.11已经完成了,你们可以看下。
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
>
> wind.fly@outlook.com  于2020年5月28日周四
> 下午5:45写道:
>
> > Hi,
> >
> >
> StreamTableEnvironment类似于TableEnvironment的Dag优化有版本规划吗?未来TableEnvironment的dag优化和StreamTableEnvironment较丰富api会不会统一?
> >
> > Best,
> > Junbao Zhang
> > 
> > 发件人: Benchao Li 
> > 发送时间: 2020年5月28日 17:35
> > 收件人: user-zh 
> > 主题: Re: 疑问:flink sql
> 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交
> >
> > Hi,
> >
> > 时间差了13个小时这个比较奇怪,我理解不应该出现这个问题的。你是说不用TableEnvironment就不会出现这个问题么?
> >
> >
> 第二个问题,TableEnvironment目前应该是没法设置checkpoint的,这个应该是只能在StreamExecutionEnvironment来配置吧。
> >
> > wind.fly@outlook.com  于2020年5月28日周四
> > 下午5:27写道:
> >
> > > Hi, Benchao:
> > >
> > >
> >
> DAG图是指向了同一个问题,但是用了TableEnvironment后,发现所有的时间都差了13个小时,比如调用LOCALTIMESTAMP函数。另外问一个问题,使用TableEnvironment怎么设置checkpoint?
> > >
> > >
> > >
> > >
> > > Best,
> > > Junbao Zhang
> > > 
> > > 发件人: Benchao Li 
> > > 发送时间: 2020年5月28日 17:05
> > > 收件人: user-zh 
> > > 主题: Re: 疑问:flink sql
> > 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交
> > >
> > > 嗯,是的。这个是blink planner特有的优化,可以支持多个sink的DAG图的优化。你也可以看下Flink
> > > UI上的任务DAG图,比较明显的可以看到只有一个Source,后面接了两个Sink。
> > >
> > > wind.fly@outlook.com  于2020年5月28日周四
> > > 下午5:02写道:
> > >
> > > > Hi, Benchao:
> > > >
> 谢谢回复,经过测试确实是这样,也就是同一个TableEnvironment下,如果有多条insert语句访问同一个表,该表的数据只消费一次?
> > > >
> > > >
> > > >
> > > >
> > > > Best,
> > > > Junbao Zhang
> > > > 
> > > > 发件人: Benchao Li 
> > > > 发送时间: 2020年5月28日 15:59
> > > > 收件人: user-zh 
> > > > 主题: Re: 疑问:flink sql
> > > 当一个env下多个insert语句共享一个view时,是否会影响view中kafka源表的offset提交
> > > >
> > > >
> > > >
> > >
> >
> 如果你的tEnv用的是TableEnvironment,而不是StreamTableEnvironment的话,两个insert会公用前面的source,也就是会只读取a表一次,然后分别给下游c和d用。
> > > >
> > > > wind.fly@outlook.com  于2020年5月28日周四
> > > > 下午3:14写道:
> > > >
> > > > > Hi,all:
> > > > > 当前使用版本flink 1.10.0,使用blink planner,假如有如下代码:
> > > > >
> > > > > tEnv.createTemporaryView("b", tEnv.sqlQuery("select * from a"));
> > > > > tEnv.sqlUpdate("insert into c select * from b where b.con1 = '1'");
> > > > > tEnv.sqlUpdate("insert into d select * from b where b.con1 = '2'");
> > > > >
> > > > > 其中a是kafka表,connector属性为:
> > > > > 'connector.properties.group.id' = 'testGroup',
> > > > > 'connector.startup-mode' = 'group-offsets'
> > > > >
> > > > >疑问是该应用运行时c、d消费a表,a表group 'testGroup'
> > > > > offset的提交会不会受到影响,还是a表中同一条记录会流向2个不同的分支?
> > > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Best,
> > > > Benchao Li
> > > >
> > >
> > >
> > > --
> > >
> > > Best,
> > > Benchao Li
> > >
> >
> >
> > --
> >
> > Best,
> > Benchao Li
> >
>
>
> --
>
> Best,
> Benchao Li
>


Re: Re: flink-sql watermark问题

2020-05-28 Thread guaishushu1...@163.com

就是我是long类型的时间戳,但是用TO_TIMESTAMP转换成'-MM-dd HH:mm:ss' 之后依然可以生成watermark。


guaishushu1...@163.com
 
发件人: Benchao Li
发送时间: 2020-05-28 17:00
收件人: user-zh
主题: Re: flink-sql watermark问题
Hi,
 
没太看明白你的问题是什么。目前的确是只支持Timestmap(3)作为事件时间列。
之所以还不支持long作为事件时间列,主要考虑的是时区问题。但是这个社区也在考虑,可以参考[1]
 
[1] https://issues.apache.org/jira/browse/FLINK-16938
 
guaishushu1...@163.com  于2020年5月28日周四 下午4:22写道:
 
> flink-1.10 sql只支持 timestamp(3) 类型字段生成watermark
> 但是long这样转换后也可以生成watermark很奇怪?
> CREATE TABLE user_log (
> response_size int,
> rowtime BIGINT,
> w_ts as TO_TIMESTAMP(FROM_UNIXTIME(rowtime/1000),'-MM-dd HH:mm:ss'),
> WATERMARK FOR w_ts AS w_ts - INTERVAL '5' SECOND --5秒的延迟
> )
>
>
>
> guaishushu1...@163.com
>
 
 
-- 
 
Best,
Benchao Li