Re: Working with bounded Datastreams - Flink 1.11.1

2020-10-27 Thread s_penakalap...@yahoo.com
 Hi All,
Request your inputs please.
Regards,Sunitha
On Tuesday, October 27, 2020, 01:01:41 PM GMT+5:30, 
s_penakalap...@yahoo.com  wrote:  
 
 Hi Team,
I want to use Flink Datastream for Batch operations which involves huge data, I 
did try to calculate count and average on the whole Datastream with out using 
window function.
 Approach I tried to calculate count on the datastream:1> Read data from table 
(say past 2 days of data) as Datastream2> apply Key operation on the 
datastream3> then use reduce function to find count, sum and average.
I have written output to file and also inserted into table: sample data from 
file is:
vehicleId=aa, count=1, fuel=10, avgFuel=0.0vehicleId=dd, count=1, fuel=7, 
avgFuel=0.0
vehicleId=dd, count=2, fuel=22, avgFuel=11.0vehicleId=dd, count=3, fuel=42, 
avgFuel=14.0vehicleId=ee, count=1, fuel=0, avgFuel=0.0
what I am looking for is , when there are multiple records with same vehicle Id 
I see that only the final record is having correct values (like vehicleId=dd). 
Is there any way to get only one final record for each vehicle as shown 
below:vehicleId=aa, count=1, fuel=10, avgFuel=0.0vehicleId=dd, count=3, 
fuel=42, avgFuel=14.0
vehicleId=ee, count=1, fuel=0, avgFuel=0.0
Also I request some help on how to sort whole DataStream based on one 
attribute. Say we have x records in one Batch Job I would like to sort and 
fetch X-2 position record per vehicle.
Regards,Sunitha.
  

Re: How to understand NOW() in SQL when using Table & SQL API to develop a streaming app?

2020-10-27 Thread Danny Chan
Our behavior also conflicts with the SQL standard, we should also mention
this in the document.

Till Rohrmann  于2020年10月27日周二 下午10:37写道:

> Thanks for the clarification. This improvement would be helpful, I believe.
>
> Cheers,
> Till
>
> On Tue, Oct 27, 2020 at 1:19 PM Jark Wu  wrote:
>
>> Hi Till,
>>
>> The documentation mentions that "this function is not deterministic"
>> where the "not deterministic" means the value of this function is not
>> deterministic for every record.
>> However, this is not very clear for users. I think we can improve the
>> documentation.
>>
>> Best,
>> Jark
>>
>> On Tue, 27 Oct 2020 at 15:59, Till Rohrmann  wrote:
>>
>>> Quick question Jark: Is this difference in behaviour documented? I
>>> couldn't find it in the docs.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Tue, Oct 27, 2020 at 7:30 AM Jark Wu  wrote:
>>>
 Hi Longdexin,

 In traditional batch sql, NOW() is executed and determined before the
 job is submitted and will not change for every processed record.
 However, this doesn't make much sense in streaming sql, therefore,
 NOW() function in Flink is executed for every record.

 Best,
 Jark

 On Fri, 23 Oct 2020 at 16:30, Till Rohrmann 
 wrote:

> Hi Longdexin,
>
> thanks for reaching out to the Flink community. I am pulling in Jark
> who might be able to help you with this question.
>
> Cheers,
> Till
>
> On Thu, Oct 22, 2020 at 2:56 PM Longdexin <274522...@qq.com> wrote:
>
>> From my point of view, the value of NOW() function in SQL is certain
>> by the
>> time when the streaming app is launched and will not change with the
>> process
>> time. However, as a new Flink user, I'm not so sure of that. By the
>> way, if
>> my attemp is to keep the time logic to update all the time, what
>> should I
>> do?
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: ValidationException using DataTypeHint in Scalar Function

2020-10-27 Thread Steve Whelan
Hi Dawid,

I added `*bridgedTo = Map.class*` as you suggested and got a slightly
different exception. I also tried passing a rawSerializer (an
implementation similar to MapSerializer[1] with String type key and value)
but got the same exception as without it. I am using Flink v1.11 for
reference.


@FunctionHint(
  input = {
  @DataTypeHint(value="RAW", bridgedTo=Map.class,
rawSerializer=MyMapSerializer.class),
  @DataTypeHint("STRING")
  },
  output = @DataTypeHint("STRING")
)
public static String eval(final Object map, final String key)


*Exception:*

Caused by: org.apache.flink.table.api.ValidationException: Invalid input
arguments. Expected signatures are:
MAP_VALUE(RAW('java.util.Map', '...'), STRING)
at
org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:190)
at
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:131)
at
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
... 49 more
Caused by: org.apache.flink.table.api.ValidationException: Invalid argument
type at position 0. Data type RAW('java.util.Map', '...') expected but
RAW('java.util.Map', ?) passed.
at
org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:137)
at
org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:102)
at
org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:126)
... 50 more


[1]
https://github.com/apache/flink/blob/release-1.11.0/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java

On Tue, Oct 27, 2020 at 6:13 AM Dawid Wysakowicz 
wrote:

> Hey Steve,
>
> You should be able to do via the bridgedTo parameter. You can additionally
> specify a serializer you want to use via rawSerializer parameter:
>
> @FunctionHint(
> input = {
> @DataTypeHint(value = "RAW", bridgedTo =
> Map.class[, rawSerializer = ... ]),
> @DataTypeHint("STRING")},
> output = @DataTypeHint("STRING")
> )
> public static String eval(final Object map, final String key)
>
> Best,
>
> Dawid
> On 26/10/2020 16:10, Steve Whelan wrote:
>
> Hi,
>
> I have a column of type *RAW('java.util.Map', ?)* that I want to pass to
> a scalar function UDF. I'm using DataTypeHints but hitting an exception.
> What would be the proper DataTypeHint and data type param to achieve this?
>
>   @FunctionHint(
>   input = {@DataTypeHint("RAW"), @DataTypeHint("STRING")},
>   output = @DataTypeHint("STRING")
>   )
>   public static String eval(final Object map, final String key) {
> // business logic
>   }
>
>
> *Exception:*
>
> Caused by: org.apache.flink.table.api.ValidationException: Invalid input
> arguments. Expected signatures are:
> MAP_VALUE(RAW('java.lang.Object', '...'), STRING)
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:190)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:131)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
> ... 50 more
> Caused by: org.apache.flink.table.api.ValidationException: Invalid
> argument type at position 0. Data type RAW('java.lang.Object', '...')
> expected but RAW('java.util.Map', ?) passed.
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:137)
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:102)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:126)
> ... 51 more
>
>
> Thank you,
>
> Steve
>
>


Re: FLINK 1.11 Graphite Metrics

2020-10-27 Thread Vijayendra Yadav
Thank You for help, Let's raise a case with AWS.

On Tue, Oct 27, 2020 at 1:58 PM Chesnay Schepler  wrote:

> In the normal Flink distribution these jars were moved from opt/ to
> plugins/ so that they are available by default without having to mess
> around with any jars.
> I don't think anyone was aware that the plugin directory is not populated
> on EMR.
>
> On 10/27/2020 9:53 PM, Vijayendra Yadav wrote:
>
> Perfect after downloading it into the plugin, it is working well. I am
> wondering why these jars have been removed from opt/ folder, earlier I was
> able to copy from opt/ to lib/ folder for 1.10.
> For now I just downloaded from Maven for 1.11 and copied in plugin/.
>
> Regards,
> Vijay
>
> On Tue, Oct 27, 2020 at 11:18 AM Chesnay Schepler 
> wrote:
>
>> So the plugins directory is completely empty?
>>
>> In that case, please download the flink-metrics-graphite jar
>> 
>> and also copy it into the plugins directory.
>>
>> On 10/27/2020 7:04 PM, Vijayendra Yadav wrote:
>>
>> Also, you are right that the plugin did not have anything by default when
>> we created EMR 5.31 with Flink 1.11.
>>
>> In opt/ I see:
>>
>> [hadoop@ip-10-223-71-70 flink]$ pwd
>> /usr/lib/flink
>> [hadoop@ip-10-223-71-70 flink]$ ll opt/
>> total 172860
>> -rw-r--r-- 1 root root 24029243 Sep 19 03:08
>> flink-azure-fs-hadoop-1.11.0.jar
>> -rw-r--r-- 1 root root   185395 Sep 19 03:11 flink-cep_2.11-1.11.0.jar
>> -rw-r--r-- 1 root root53473 Sep 19 03:17
>> flink-cep-scala_2.11-1.11.0.jar
>> -rw-r--r-- 1 root root   640604 Sep 19 03:16 flink-gelly_2.11-1.11.0.jar
>> -rw-r--r-- 1 root root   764049 Sep 19 03:16
>> flink-gelly-scala_2.11-1.11.0.jar
>> -rw-r--r-- 1 root root   268951 Sep 19 03:17 flink-ml_2.11-1.11.0.jar
>> -rw-r--r-- 1 root root 22316430 Sep 19 03:08
>> flink-oss-fs-hadoop-1.11.0.jar
>> -rw-r--r-- 1 root root 37228704 Sep 19 03:17 flink-python_2.11-1.11.0.jar
>> -rw-r--r-- 1 root root22155 Sep 19 03:16
>> flink-queryable-state-runtime_2.11-1.11.0.jar
>> -rw-r--r-- 1 root root 19985454 Sep 19 03:08 flink-s3-fs-hadoop-1.11.0.jar
>> -rw-r--r-- 1 root root 36173428 Sep 19 03:08 flink-s3-fs-presto-1.11.0.jar
>> -rw-r--r-- 1 root root   194834 Aug 28 16:51
>> flink-shaded-netty-tcnative-dynamic-2.0.25.Final-11.0.jar
>> -rw-r--r-- 1 root root  8028165 Aug 28 17:04
>> flink-shaded-zookeeper-3.5.6.jar
>> -rw-r--r-- 1 root root   544183 Sep 19 03:17
>> flink-sql-client_2.11-1.11.0.jar
>> -rw-r--r-- 1 root root   103766 Sep 19 03:17
>> flink-state-processor-api_2.11-1.11.0.jar
>> -rw-r--r-- 1 root root 26428976 Sep 19 03:08
>> flink-swift-fs-hadoop-1.11.0.jar
>> drwxr-xr-x 2 root root  134 Oct 13 18:01 python
>>
>> in lib/ I see:
>>
>> [hadoop@ip-10-223-71-70 flink]$ ll lib/
>> total 190304
>> -rw-r--r-- 1 root root 90784 Sep 19 03:14 flink-csv-1.11.0.jar
>> -rw-r--r-- 1 root root 114256876 Sep 19 03:17 flink-dist_2.11-1.11.0.jar
>> -rw-r--r-- 1 root root 94866 Sep 19 03:14 flink-json-1.11.0.jar
>> -rw-r--r-- 1 root root   7712156 Aug 28 16:51
>> flink-shaded-zookeeper-3.4.14.jar
>> -rw-r--r-- 1 root root  33325748 Sep 19 03:17 flink-table_2.11-1.11.0.jar
>> -rw-r--r-- 1 root root  37330514 Sep 19 03:17
>> flink-table-blink_2.11-1.11.0.jar
>> -rw-r--r-- 1 root root 67114 Aug 28 16:50 log4j-1.2-api-2.12.1.jar
>> -rw-r--r-- 1 root root276771 Aug 28 16:50 log4j-api-2.12.1.jar
>> -rw-r--r-- 1 root root   1674433 Aug 28 16:50 log4j-core-2.12.1.jar
>> -rw-r--r-- 1 root root 23518 Aug 28 16:50 log4j-slf4j-impl-2.12.1.jar
>>
>> Regards,
>> Vijay
>>
>> On Tue, Oct 27, 2020 at 10:57 AM Vijayendra Yadav 
>> wrote:
>>
>>> Hi Chesnay,
>>>
>>> Steps to upgrade are as follows:
>>>
>>> 1) Created EMR 5.31 Cluster which comes with Flink 1.11
>>> 2) Copied flink-s3-fs-hadoop-1.11.0.jar to plugin folder for
>>> application.
>>>
>>>cd  /usr/lib/flink/
>>>
>>> mkdir -p  ./plugins/s3-fs-hadoop
>>>
>>> cp ./opt/flink-s3-fs-hadoop-1.11.0.jar ./plugins/s3-fs-hadoop/
>>>
>>> 3) Recompiled Application with Flink 1.11 dependency.
>>> 4) Updated Graphite plugin class in config
>>>
>>> That is all I did.
>>>
>>> Regards,
>>> Vijay
>>>
>>>
>>> On Tue, Oct 27, 2020 at 10:00 AM Chesnay Schepler 
>>> wrote:
>>>
 How exactly did you do the upgrade? Did you copy some files from 1.11
 into an existing 1.10 distribution?

 The configuration is correct, but it appears as if the entire plugins
 directory is either a) empty or b) not shipped.

 On 10/27/2020 5:22 PM, Vijayendra Yadav wrote:

 Hi Robert and Chesnay,

 Only  thing changed is I upgraded from Flink 1.10 to 1.11 and to
 support that updated conf yaml with factory class.

 Here is attached Full Log with classpath etc.   (log.txt)

 Regards,
 Vijay



 On Tue, Oct 27, 2020 at 9:31 AM Chesnay Schepler 
 wrote:

> Are you writing a test? (otherwise the ReporterSetupTest reporters
> 

Re: FLINK 1.11 Graphite Metrics

2020-10-27 Thread Chesnay Schepler
In the normal Flink distribution these jars were moved from opt/ to 
plugins/ so that they are available by default without having to mess 
around with any jars.
I don't think anyone was aware that the plugin directory is not 
populated on EMR.


On 10/27/2020 9:53 PM, Vijayendra Yadav wrote:
Perfect after downloading it into the plugin, it is working well. I am 
wondering why these jars have been removed from opt/ folder, earlier I 
was able to copy from opt/ to lib/ folder for 1.10.

For now I just downloaded from Maven for 1.11 and copied in plugin/.

Regards,
Vijay

On Tue, Oct 27, 2020 at 11:18 AM Chesnay Schepler > wrote:


So the plugins directory is completely empty?

In that case, please download the flink-metrics-graphite jar


and also copy it into the plugins directory.

On 10/27/2020 7:04 PM, Vijayendra Yadav wrote:

Also, you are right that the plugin did not have anything by
default when we created EMR 5.31 with Flink 1.11.

In opt/ I see:

[hadoop@ip-10-223-71-70 flink]$ pwd
/usr/lib/flink
[hadoop@ip-10-223-71-70 flink]$ ll opt/
total 172860
-rw-r--r-- 1 root root 24029243 Sep 19 03:08
flink-azure-fs-hadoop-1.11.0.jar
-rw-r--r-- 1 root root   185395 Sep 19 03:11
flink-cep_2.11-1.11.0.jar
-rw-r--r-- 1 root root    53473 Sep 19 03:17
flink-cep-scala_2.11-1.11.0.jar
-rw-r--r-- 1 root root   640604 Sep 19 03:16
flink-gelly_2.11-1.11.0.jar
-rw-r--r-- 1 root root   764049 Sep 19 03:16
flink-gelly-scala_2.11-1.11.0.jar
-rw-r--r-- 1 root root   268951 Sep 19 03:17 flink-ml_2.11-1.11.0.jar
-rw-r--r-- 1 root root 22316430 Sep 19 03:08
flink-oss-fs-hadoop-1.11.0.jar
-rw-r--r-- 1 root root 37228704 Sep 19 03:17
flink-python_2.11-1.11.0.jar
-rw-r--r-- 1 root root    22155 Sep 19 03:16
flink-queryable-state-runtime_2.11-1.11.0.jar
-rw-r--r-- 1 root root 19985454 Sep 19 03:08
flink-s3-fs-hadoop-1.11.0.jar
-rw-r--r-- 1 root root 36173428 Sep 19 03:08
flink-s3-fs-presto-1.11.0.jar
-rw-r--r-- 1 root root   194834 Aug 28 16:51
flink-shaded-netty-tcnative-dynamic-2.0.25.Final-11.0.jar
-rw-r--r-- 1 root root  8028165 Aug 28 17:04
flink-shaded-zookeeper-3.5.6.jar
-rw-r--r-- 1 root root   544183 Sep 19 03:17
flink-sql-client_2.11-1.11.0.jar
-rw-r--r-- 1 root root   103766 Sep 19 03:17
flink-state-processor-api_2.11-1.11.0.jar
-rw-r--r-- 1 root root 26428976 Sep 19 03:08
flink-swift-fs-hadoop-1.11.0.jar
drwxr-xr-x 2 root root      134 Oct 13 18:01 python

in lib/ I see:

[hadoop@ip-10-223-71-70 flink]$ ll lib/
total 190304
-rw-r--r-- 1 root root     90784 Sep 19 03:14 flink-csv-1.11.0.jar
-rw-r--r-- 1 root root 114256876 Sep 19 03:17
flink-dist_2.11-1.11.0.jar
-rw-r--r-- 1 root root     94866 Sep 19 03:14 flink-json-1.11.0.jar
-rw-r--r-- 1 root root   7712156 Aug 28 16:51
flink-shaded-zookeeper-3.4.14.jar
-rw-r--r-- 1 root root  33325748 Sep 19 03:17
flink-table_2.11-1.11.0.jar
-rw-r--r-- 1 root root  37330514 Sep 19 03:17
flink-table-blink_2.11-1.11.0.jar
-rw-r--r-- 1 root root     67114 Aug 28 16:50
log4j-1.2-api-2.12.1.jar
-rw-r--r-- 1 root root    276771 Aug 28 16:50 log4j-api-2.12.1.jar
-rw-r--r-- 1 root root   1674433 Aug 28 16:50 log4j-core-2.12.1.jar
-rw-r--r-- 1 root root     23518 Aug 28 16:50
log4j-slf4j-impl-2.12.1.jar

Regards,
Vijay

On Tue, Oct 27, 2020 at 10:57 AM Vijayendra Yadav
mailto:contact@gmail.com>> wrote:

Hi Chesnay,

Steps to upgrade are as follows:

1) Created EMR 5.31 Cluster which comes with Flink 1.11
2) Copied flink-s3-fs-hadoop-1.11.0.jar to plugin folder for
application.

cd /usr/lib/flink/

mkdir -p ./plugins/s3-fs-hadoop

cp ./opt/flink-s3-fs-hadoop-1.11.0.jar ./plugins/s3-fs-hadoop/

3) Recompiled Application with Flink 1.11 dependency.
4) Updated Graphite plugin class in config

That is all I did.

Regards,
Vijay


On Tue, Oct 27, 2020 at 10:00 AM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

How exactly did you do the upgrade? Did you copy some
files from 1.11 into an existing 1.10 distribution?

The configuration is correct, but it appears as if the
entire plugins directory is either a) empty or b) not
shipped.

On 10/27/2020 5:22 PM, Vijayendra Yadav wrote:

Hi Robert and Chesnay,

Only  thing changed is I upgraded from Flink 1.10 to
1.11 and to support that updated conf yaml with factory
class.

Here is attached Full Log with classpath etc.   (log.txt)

Regards,
Vijay


On Tue, Oct 27, 2020 at 9:31 AM 

Re: FLINK 1.11 Graphite Metrics

2020-10-27 Thread Vijayendra Yadav
Perfect after downloading it into the plugin, it is working well. I am
wondering why these jars have been removed from opt/ folder, earlier I was
able to copy from opt/ to lib/ folder for 1.10.
For now I just downloaded from Maven for 1.11 and copied in plugin/.

Regards,
Vijay

On Tue, Oct 27, 2020 at 11:18 AM Chesnay Schepler 
wrote:

> So the plugins directory is completely empty?
>
> In that case, please download the flink-metrics-graphite jar
> 
> and also copy it into the plugins directory.
>
> On 10/27/2020 7:04 PM, Vijayendra Yadav wrote:
>
> Also, you are right that the plugin did not have anything by default when
> we created EMR 5.31 with Flink 1.11.
>
> In opt/ I see:
>
> [hadoop@ip-10-223-71-70 flink]$ pwd
> /usr/lib/flink
> [hadoop@ip-10-223-71-70 flink]$ ll opt/
> total 172860
> -rw-r--r-- 1 root root 24029243 Sep 19 03:08
> flink-azure-fs-hadoop-1.11.0.jar
> -rw-r--r-- 1 root root   185395 Sep 19 03:11 flink-cep_2.11-1.11.0.jar
> -rw-r--r-- 1 root root53473 Sep 19 03:17
> flink-cep-scala_2.11-1.11.0.jar
> -rw-r--r-- 1 root root   640604 Sep 19 03:16 flink-gelly_2.11-1.11.0.jar
> -rw-r--r-- 1 root root   764049 Sep 19 03:16
> flink-gelly-scala_2.11-1.11.0.jar
> -rw-r--r-- 1 root root   268951 Sep 19 03:17 flink-ml_2.11-1.11.0.jar
> -rw-r--r-- 1 root root 22316430 Sep 19 03:08 flink-oss-fs-hadoop-1.11.0.jar
> -rw-r--r-- 1 root root 37228704 Sep 19 03:17 flink-python_2.11-1.11.0.jar
> -rw-r--r-- 1 root root22155 Sep 19 03:16
> flink-queryable-state-runtime_2.11-1.11.0.jar
> -rw-r--r-- 1 root root 19985454 Sep 19 03:08 flink-s3-fs-hadoop-1.11.0.jar
> -rw-r--r-- 1 root root 36173428 Sep 19 03:08 flink-s3-fs-presto-1.11.0.jar
> -rw-r--r-- 1 root root   194834 Aug 28 16:51
> flink-shaded-netty-tcnative-dynamic-2.0.25.Final-11.0.jar
> -rw-r--r-- 1 root root  8028165 Aug 28 17:04
> flink-shaded-zookeeper-3.5.6.jar
> -rw-r--r-- 1 root root   544183 Sep 19 03:17
> flink-sql-client_2.11-1.11.0.jar
> -rw-r--r-- 1 root root   103766 Sep 19 03:17
> flink-state-processor-api_2.11-1.11.0.jar
> -rw-r--r-- 1 root root 26428976 Sep 19 03:08
> flink-swift-fs-hadoop-1.11.0.jar
> drwxr-xr-x 2 root root  134 Oct 13 18:01 python
>
> in lib/ I see:
>
> [hadoop@ip-10-223-71-70 flink]$ ll lib/
> total 190304
> -rw-r--r-- 1 root root 90784 Sep 19 03:14 flink-csv-1.11.0.jar
> -rw-r--r-- 1 root root 114256876 Sep 19 03:17 flink-dist_2.11-1.11.0.jar
> -rw-r--r-- 1 root root 94866 Sep 19 03:14 flink-json-1.11.0.jar
> -rw-r--r-- 1 root root   7712156 Aug 28 16:51
> flink-shaded-zookeeper-3.4.14.jar
> -rw-r--r-- 1 root root  33325748 Sep 19 03:17 flink-table_2.11-1.11.0.jar
> -rw-r--r-- 1 root root  37330514 Sep 19 03:17
> flink-table-blink_2.11-1.11.0.jar
> -rw-r--r-- 1 root root 67114 Aug 28 16:50 log4j-1.2-api-2.12.1.jar
> -rw-r--r-- 1 root root276771 Aug 28 16:50 log4j-api-2.12.1.jar
> -rw-r--r-- 1 root root   1674433 Aug 28 16:50 log4j-core-2.12.1.jar
> -rw-r--r-- 1 root root 23518 Aug 28 16:50 log4j-slf4j-impl-2.12.1.jar
>
> Regards,
> Vijay
>
> On Tue, Oct 27, 2020 at 10:57 AM Vijayendra Yadav 
> wrote:
>
>> Hi Chesnay,
>>
>> Steps to upgrade are as follows:
>>
>> 1) Created EMR 5.31 Cluster which comes with Flink 1.11
>> 2) Copied flink-s3-fs-hadoop-1.11.0.jar to plugin folder for application.
>>
>>cd  /usr/lib/flink/
>>
>> mkdir -p  ./plugins/s3-fs-hadoop
>>
>> cp ./opt/flink-s3-fs-hadoop-1.11.0.jar ./plugins/s3-fs-hadoop/
>>
>> 3) Recompiled Application with Flink 1.11 dependency.
>> 4) Updated Graphite plugin class in config
>>
>> That is all I did.
>>
>> Regards,
>> Vijay
>>
>>
>> On Tue, Oct 27, 2020 at 10:00 AM Chesnay Schepler 
>> wrote:
>>
>>> How exactly did you do the upgrade? Did you copy some files from 1.11
>>> into an existing 1.10 distribution?
>>>
>>> The configuration is correct, but it appears as if the entire plugins
>>> directory is either a) empty or b) not shipped.
>>>
>>> On 10/27/2020 5:22 PM, Vijayendra Yadav wrote:
>>>
>>> Hi Robert and Chesnay,
>>>
>>> Only  thing changed is I upgraded from Flink 1.10 to 1.11 and to support
>>> that updated conf yaml with factory class.
>>>
>>> Here is attached Full Log with classpath etc.   (log.txt)
>>>
>>> Regards,
>>> Vijay
>>>
>>>
>>>
>>> On Tue, Oct 27, 2020 at 9:31 AM Chesnay Schepler 
>>> wrote:
>>>
 Are you writing a test? (otherwise the ReporterSetupTest reporters
 wouldn't be around)
 Do you have a dependency on the graphite reporter?

 On 10/27/2020 8:27 AM, Robert Metzger wrote:

 Hi Vijayendra,
 can you post or upload the entire logs, so that we can see the
 Classpath logged on startup, as well as the effective configuration
 parameters?

 On Tue, Oct 27, 2020 at 12:49 AM Vijayendra Yadav <
 contact@gmail.com> wrote:

> Hi Chesnay,
>
> Another log message:
>
> 2020-10-26 23:33:08,516 WARN
> 

Re: how to enable metrics in Flink 1.11

2020-10-27 Thread Diwakar Jha
Hi Robert,
Could please correct me. I'm not able to stop the app. Also, i
stopped flink job already.

sh-4.2$ yarn app -stop application_1603649952937_0002
2020-10-27 20:04:25,543 INFO client.RMProxy: Connecting to ResourceManager
at ip-10-0-55-50.ec2.internal/10.0.55.50:8032
2020-10-27 20:04:25,717 INFO client.AHSProxy: Connecting to Application
History server at ip-10-0-55-50.ec2.internal/10.0.55.50:10200
Exception in thread "main" java.lang.IllegalArgumentException: App admin
client class name not specified for type Apache Flink
at
org.apache.hadoop.yarn.client.api.AppAdminClient.createAppAdminClient(AppAdminClient.java:76)
at
org.apache.hadoop.yarn.client.cli.ApplicationCLI.run(ApplicationCLI.java:597)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:76)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:90)
at
org.apache.hadoop.yarn.client.cli.ApplicationCLI.main(ApplicationCLI.java:126)
sh-4.2$

On Tue, Oct 27, 2020 at 9:34 AM Robert Metzger  wrote:

> Hi,
> are you intentionally not posting this response to the mailing list?
>
> As you can see from the yarn logs, log aggregation only works for finished
> applications ("End of LogType:prelaunch.out.This log file belongs to a
> running container (container_1603649952937_0002_01_02) and so may not
> be complete.")
>
> Please stop the app, then provide the logs.
>
>
> On Tue, Oct 27, 2020 at 5:11 PM Diwakar Jha 
> wrote:
>
>> Hi Robert,
>>
>> Yes, i'm using Flink on EMR using YARN. Please find attached the yarn
>> logs -applicationId. I also attached haddop-yarn-nodemanager logs.
>> Also, I followed this link below which has the same problem :
>> http://mail-archives.apache.org/mod_mbox/flink-user/202009.mbox/%3CCAGDv3o5WyJTrXs9Pg+Vy-b+LwgEE26iN54iqE0=f5t+m8vw...@mail.gmail.com%3E
>>
>> https://www.talkend.net/post/75078.html
>> Based on this I changed the log4j.properties.
>> Let me know what you think. Please also let me know if you need some
>> specific logs.  Appreciate your help.
>>
>> Best,
>> Diwakar
>>
>> On Tue, Oct 27, 2020 at 12:26 AM Robert Metzger 
>> wrote:
>>
>>> Hey Diwakar,
>>>
>>> how are you deploying Flink on EMR? Are you using YARN?
>>> If so, you could also use log aggregation to see all the logs at once
>>> (from both JobManager and TaskManagers). (yarn logs -applicationId
>>> )
>>>
>>> Could you post (or upload somewhere) all logs you have of one run? It is
>>> much easier for us to debug something if we have the full logs (the logs
>>> show for example the classpath that you are using, we would see how you are
>>> deploying Flink, etc.)
>>>
>>> From the information available, my guess is that you have modified your
>>> deployment in some way (use of a custom logging version, custom deployment
>>> method, version mixup with jars from both Flink 1.8 and 1.11, ...).
>>>
>>> Best,
>>> Robert
>>>
>>>
>>> On Tue, Oct 27, 2020 at 12:41 AM Diwakar Jha 
>>> wrote:
>>>
 This is what I see on the WebUI.

 23:19:24.263 [flink-akka.actor.default-dispatcher-1865] ERROR
 org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler
 - Failed to transfer file from TaskExecutor
 container_1603649952937_0002_01_04.
 java.util.concurrent.CompletionException:
 org.apache.flink.util.FlinkException: The file LOG does not exist on the
 TaskExecutor. at
 org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$25(
 TaskExecutor.java:1742 )
 ~[flink-dist_2.12-1.11.0.jar:1.11.0] at
 java.util.concurrent.CompletableFuture$AsyncSupply.run
 (
 CompletableFuture.java:1604 )
 ~[?:1.8.0_252] at java.util.concurrent.ThreadPoolExecutor.runWorker(
 ThreadPoolExecutor.java:1149 )
 ~[?:1.8.0_252] at java.util.concurrent.ThreadPoolExecutor$Worker.run
 (
 ThreadPoolExecutor.java:624 )
 ~[?:1.8.0_252] at java.lang.Thread.run (
 Thread.java:748 ) ~[?:1.8.0_252] Caused by:
 org.apache.flink.util.FlinkException: The file LOG does not exist on the
 TaskExecutor. ... 5 more 23:19:24.275
 [flink-akka.actor.default-dispatcher-1865] ERROR
 org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler
 - Unhandled exception. org.apache.flink.util.FlinkException: The file LOG
 does not exist on the TaskExecutor. at
 org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$25(
 TaskExecutor.java:1742 )
 ~[flink-dist_2.12-1.11.0.jar:1.11.0] at
 java.util.concurrent.CompletableFuture$AsyncSupply.run
 

Re: FLINK 1.11 Graphite Metrics

2020-10-27 Thread Chesnay Schepler

So the plugins directory is completely empty?

In that case, please download the flink-metrics-graphite jar 
 
and also copy it into the plugins directory.


On 10/27/2020 7:04 PM, Vijayendra Yadav wrote:
Also, you are right that the plugin did not have anything by default 
when we created EMR 5.31 with Flink 1.11.


In opt/ I see:

[hadoop@ip-10-223-71-70 flink]$ pwd
/usr/lib/flink
[hadoop@ip-10-223-71-70 flink]$ ll opt/
total 172860
-rw-r--r-- 1 root root 24029243 Sep 19 03:08 
flink-azure-fs-hadoop-1.11.0.jar

-rw-r--r-- 1 root root   185395 Sep 19 03:11 flink-cep_2.11-1.11.0.jar
-rw-r--r-- 1 root root    53473 Sep 19 03:17 
flink-cep-scala_2.11-1.11.0.jar

-rw-r--r-- 1 root root   640604 Sep 19 03:16 flink-gelly_2.11-1.11.0.jar
-rw-r--r-- 1 root root   764049 Sep 19 03:16 
flink-gelly-scala_2.11-1.11.0.jar

-rw-r--r-- 1 root root   268951 Sep 19 03:17 flink-ml_2.11-1.11.0.jar
-rw-r--r-- 1 root root 22316430 Sep 19 03:08 
flink-oss-fs-hadoop-1.11.0.jar

-rw-r--r-- 1 root root 37228704 Sep 19 03:17 flink-python_2.11-1.11.0.jar
-rw-r--r-- 1 root root    22155 Sep 19 03:16 
flink-queryable-state-runtime_2.11-1.11.0.jar

-rw-r--r-- 1 root root 19985454 Sep 19 03:08 flink-s3-fs-hadoop-1.11.0.jar
-rw-r--r-- 1 root root 36173428 Sep 19 03:08 flink-s3-fs-presto-1.11.0.jar
-rw-r--r-- 1 root root   194834 Aug 28 16:51 
flink-shaded-netty-tcnative-dynamic-2.0.25.Final-11.0.jar
-rw-r--r-- 1 root root  8028165 Aug 28 17:04 
flink-shaded-zookeeper-3.5.6.jar
-rw-r--r-- 1 root root   544183 Sep 19 03:17 
flink-sql-client_2.11-1.11.0.jar
-rw-r--r-- 1 root root   103766 Sep 19 03:17 
flink-state-processor-api_2.11-1.11.0.jar
-rw-r--r-- 1 root root 26428976 Sep 19 03:08 
flink-swift-fs-hadoop-1.11.0.jar

drwxr-xr-x 2 root root      134 Oct 13 18:01 python

in lib/ I see:

[hadoop@ip-10-223-71-70 flink]$ ll lib/
total 190304
-rw-r--r-- 1 root root     90784 Sep 19 03:14 flink-csv-1.11.0.jar
-rw-r--r-- 1 root root 114256876 Sep 19 03:17 flink-dist_2.11-1.11.0.jar
-rw-r--r-- 1 root root     94866 Sep 19 03:14 flink-json-1.11.0.jar
-rw-r--r-- 1 root root   7712156 Aug 28 16:51 
flink-shaded-zookeeper-3.4.14.jar

-rw-r--r-- 1 root root  33325748 Sep 19 03:17 flink-table_2.11-1.11.0.jar
-rw-r--r-- 1 root root  37330514 Sep 19 03:17 
flink-table-blink_2.11-1.11.0.jar

-rw-r--r-- 1 root root     67114 Aug 28 16:50 log4j-1.2-api-2.12.1.jar
-rw-r--r-- 1 root root    276771 Aug 28 16:50 log4j-api-2.12.1.jar
-rw-r--r-- 1 root root   1674433 Aug 28 16:50 log4j-core-2.12.1.jar
-rw-r--r-- 1 root root     23518 Aug 28 16:50 log4j-slf4j-impl-2.12.1.jar

Regards,
Vijay

On Tue, Oct 27, 2020 at 10:57 AM Vijayendra Yadav 
mailto:contact@gmail.com>> wrote:


Hi Chesnay,

Steps to upgrade are as follows:

1) Created EMR 5.31 Cluster which comes with Flink 1.11
2) Copied flink-s3-fs-hadoop-1.11.0.jar to plugin folder for
application.

cd /usr/lib/flink/

mkdir -p ./plugins/s3-fs-hadoop

cp ./opt/flink-s3-fs-hadoop-1.11.0.jar ./plugins/s3-fs-hadoop/

3) Recompiled Application with Flink 1.11 dependency.
4) Updated Graphite plugin class in config

That is all I did.

Regards,
Vijay


On Tue, Oct 27, 2020 at 10:00 AM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

How exactly did you do the upgrade? Did you copy some files
from 1.11 into an existing 1.10 distribution?

The configuration is correct, but it appears as if the entire
plugins directory is either a) empty or b) not shipped.

On 10/27/2020 5:22 PM, Vijayendra Yadav wrote:

Hi Robert and Chesnay,

Only  thing changed is I upgraded from Flink 1.10 to 1.11 and
to support that updated conf yaml with factory class.

Here is attached Full Log with classpath etc.  (log.txt)

Regards,
Vijay


On Tue, Oct 27, 2020 at 9:31 AM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

Are you writing a test? (otherwise the ReporterSetupTest
reporters wouldn't be around)
Do you have a dependency on the graphite reporter?

On 10/27/2020 8:27 AM, Robert Metzger wrote:

Hi Vijayendra,
can you post or upload the entire logs, so that we can
see the Classpath logged on startup, as well as the
effective configuration parameters?

On Tue, Oct 27, 2020 at 12:49 AM Vijayendra Yadav
mailto:contact@gmail.com>>
wrote:

Hi Chesnay,

Another log message:

2020-10-26 23:33:08,516 WARN
org.apache.flink.runtime.metrics.ReporterSetup - The
reporter factory
(org.apache.flink.metrics.graphite.GraphiteReporterFactory)
could not be found for reporter grph. Available
factories:


Re: FLINK 1.11 Graphite Metrics

2020-10-27 Thread Vijayendra Yadav
Also, you are right that the plugin did not have anything by default when
we created EMR 5.31 with Flink 1.11.

In opt/ I see:

[hadoop@ip-10-223-71-70 flink]$ pwd
/usr/lib/flink
[hadoop@ip-10-223-71-70 flink]$ ll opt/
total 172860
-rw-r--r-- 1 root root 24029243 Sep 19 03:08
flink-azure-fs-hadoop-1.11.0.jar
-rw-r--r-- 1 root root   185395 Sep 19 03:11 flink-cep_2.11-1.11.0.jar
-rw-r--r-- 1 root root53473 Sep 19 03:17 flink-cep-scala_2.11-1.11.0.jar
-rw-r--r-- 1 root root   640604 Sep 19 03:16 flink-gelly_2.11-1.11.0.jar
-rw-r--r-- 1 root root   764049 Sep 19 03:16
flink-gelly-scala_2.11-1.11.0.jar
-rw-r--r-- 1 root root   268951 Sep 19 03:17 flink-ml_2.11-1.11.0.jar
-rw-r--r-- 1 root root 22316430 Sep 19 03:08 flink-oss-fs-hadoop-1.11.0.jar
-rw-r--r-- 1 root root 37228704 Sep 19 03:17 flink-python_2.11-1.11.0.jar
-rw-r--r-- 1 root root22155 Sep 19 03:16
flink-queryable-state-runtime_2.11-1.11.0.jar
-rw-r--r-- 1 root root 19985454 Sep 19 03:08 flink-s3-fs-hadoop-1.11.0.jar
-rw-r--r-- 1 root root 36173428 Sep 19 03:08 flink-s3-fs-presto-1.11.0.jar
-rw-r--r-- 1 root root   194834 Aug 28 16:51
flink-shaded-netty-tcnative-dynamic-2.0.25.Final-11.0.jar
-rw-r--r-- 1 root root  8028165 Aug 28 17:04
flink-shaded-zookeeper-3.5.6.jar
-rw-r--r-- 1 root root   544183 Sep 19 03:17
flink-sql-client_2.11-1.11.0.jar
-rw-r--r-- 1 root root   103766 Sep 19 03:17
flink-state-processor-api_2.11-1.11.0.jar
-rw-r--r-- 1 root root 26428976 Sep 19 03:08
flink-swift-fs-hadoop-1.11.0.jar
drwxr-xr-x 2 root root  134 Oct 13 18:01 python

in lib/ I see:

[hadoop@ip-10-223-71-70 flink]$ ll lib/
total 190304
-rw-r--r-- 1 root root 90784 Sep 19 03:14 flink-csv-1.11.0.jar
-rw-r--r-- 1 root root 114256876 Sep 19 03:17 flink-dist_2.11-1.11.0.jar
-rw-r--r-- 1 root root 94866 Sep 19 03:14 flink-json-1.11.0.jar
-rw-r--r-- 1 root root   7712156 Aug 28 16:51
flink-shaded-zookeeper-3.4.14.jar
-rw-r--r-- 1 root root  33325748 Sep 19 03:17 flink-table_2.11-1.11.0.jar
-rw-r--r-- 1 root root  37330514 Sep 19 03:17
flink-table-blink_2.11-1.11.0.jar
-rw-r--r-- 1 root root 67114 Aug 28 16:50 log4j-1.2-api-2.12.1.jar
-rw-r--r-- 1 root root276771 Aug 28 16:50 log4j-api-2.12.1.jar
-rw-r--r-- 1 root root   1674433 Aug 28 16:50 log4j-core-2.12.1.jar
-rw-r--r-- 1 root root 23518 Aug 28 16:50 log4j-slf4j-impl-2.12.1.jar

Regards,
Vijay

On Tue, Oct 27, 2020 at 10:57 AM Vijayendra Yadav 
wrote:

> Hi Chesnay,
>
> Steps to upgrade are as follows:
>
> 1) Created EMR 5.31 Cluster which comes with Flink 1.11
> 2) Copied flink-s3-fs-hadoop-1.11.0.jar to plugin folder for application.
>
>cd  /usr/lib/flink/
>
> mkdir -p  ./plugins/s3-fs-hadoop
>
> cp ./opt/flink-s3-fs-hadoop-1.11.0.jar ./plugins/s3-fs-hadoop/
>
> 3) Recompiled Application with Flink 1.11 dependency.
> 4) Updated Graphite plugin class in config
>
> That is all I did.
>
> Regards,
> Vijay
>
>
> On Tue, Oct 27, 2020 at 10:00 AM Chesnay Schepler 
> wrote:
>
>> How exactly did you do the upgrade? Did you copy some files from 1.11
>> into an existing 1.10 distribution?
>>
>> The configuration is correct, but it appears as if the entire plugins
>> directory is either a) empty or b) not shipped.
>>
>> On 10/27/2020 5:22 PM, Vijayendra Yadav wrote:
>>
>> Hi Robert and Chesnay,
>>
>> Only  thing changed is I upgraded from Flink 1.10 to 1.11 and to support
>> that updated conf yaml with factory class.
>>
>> Here is attached Full Log with classpath etc.   (log.txt)
>>
>> Regards,
>> Vijay
>>
>>
>>
>> On Tue, Oct 27, 2020 at 9:31 AM Chesnay Schepler 
>> wrote:
>>
>>> Are you writing a test? (otherwise the ReporterSetupTest reporters
>>> wouldn't be around)
>>> Do you have a dependency on the graphite reporter?
>>>
>>> On 10/27/2020 8:27 AM, Robert Metzger wrote:
>>>
>>> Hi Vijayendra,
>>> can you post or upload the entire logs, so that we can see the Classpath
>>> logged on startup, as well as the effective configuration parameters?
>>>
>>> On Tue, Oct 27, 2020 at 12:49 AM Vijayendra Yadav 
>>> wrote:
>>>
 Hi Chesnay,

 Another log message:

 2020-10-26 23:33:08,516 WARN
 org.apache.flink.runtime.metrics.ReporterSetup - The reporter factory
 (org.apache.flink.metrics.graphite.GraphiteReporterFactory) could not be
 found for reporter grph. Available factories:
 [org.apache.flink.runtime.metrics.ReporterSetupTest$ConfigExposingReporterFactory,
 org.apache.flink.runtime.metrics.ReporterSetupTest$TestReporterFactory,
 org.apache.flink.runtime.metrics.ReporterSetupTest$InstantiationTypeTrackingTestReporterFactory,
 org.apache.flink.runtime.metrics.ReporterSetupTest$FailingFactory].
 2020-10-26 23:33:08,517 INFO
 org.apache.flink.runtime.metrics.MetricRegistryImpl - No metrics reporter
 configured, no metrics will be exposed/reported.
 Regards,
 Vijay

 On Mon, Oct 26, 2020 at 2:34 PM Vijayendra Yadav 
 wrote:

> Hi Chesnay,
>
> I have the same, and I am exporting the 

Re: FLINK 1.11 Graphite Metrics

2020-10-27 Thread Vijayendra Yadav
Hi Chesnay,

Steps to upgrade are as follows:

1) Created EMR 5.31 Cluster which comes with Flink 1.11
2) Copied flink-s3-fs-hadoop-1.11.0.jar to plugin folder for application.

   cd  /usr/lib/flink/

mkdir -p  ./plugins/s3-fs-hadoop

cp ./opt/flink-s3-fs-hadoop-1.11.0.jar ./plugins/s3-fs-hadoop/

3) Recompiled Application with Flink 1.11 dependency.
4) Updated Graphite plugin class in config

That is all I did.

Regards,
Vijay


On Tue, Oct 27, 2020 at 10:00 AM Chesnay Schepler 
wrote:

> How exactly did you do the upgrade? Did you copy some files from 1.11 into
> an existing 1.10 distribution?
>
> The configuration is correct, but it appears as if the entire plugins
> directory is either a) empty or b) not shipped.
>
> On 10/27/2020 5:22 PM, Vijayendra Yadav wrote:
>
> Hi Robert and Chesnay,
>
> Only  thing changed is I upgraded from Flink 1.10 to 1.11 and to support
> that updated conf yaml with factory class.
>
> Here is attached Full Log with classpath etc.   (log.txt)
>
> Regards,
> Vijay
>
>
>
> On Tue, Oct 27, 2020 at 9:31 AM Chesnay Schepler 
> wrote:
>
>> Are you writing a test? (otherwise the ReporterSetupTest reporters
>> wouldn't be around)
>> Do you have a dependency on the graphite reporter?
>>
>> On 10/27/2020 8:27 AM, Robert Metzger wrote:
>>
>> Hi Vijayendra,
>> can you post or upload the entire logs, so that we can see the Classpath
>> logged on startup, as well as the effective configuration parameters?
>>
>> On Tue, Oct 27, 2020 at 12:49 AM Vijayendra Yadav 
>> wrote:
>>
>>> Hi Chesnay,
>>>
>>> Another log message:
>>>
>>> 2020-10-26 23:33:08,516 WARN
>>> org.apache.flink.runtime.metrics.ReporterSetup - The reporter factory
>>> (org.apache.flink.metrics.graphite.GraphiteReporterFactory) could not be
>>> found for reporter grph. Available factories:
>>> [org.apache.flink.runtime.metrics.ReporterSetupTest$ConfigExposingReporterFactory,
>>> org.apache.flink.runtime.metrics.ReporterSetupTest$TestReporterFactory,
>>> org.apache.flink.runtime.metrics.ReporterSetupTest$InstantiationTypeTrackingTestReporterFactory,
>>> org.apache.flink.runtime.metrics.ReporterSetupTest$FailingFactory].
>>> 2020-10-26 23:33:08,517 INFO
>>> org.apache.flink.runtime.metrics.MetricRegistryImpl - No metrics reporter
>>> configured, no metrics will be exposed/reported.
>>> Regards,
>>> Vijay
>>>
>>> On Mon, Oct 26, 2020 at 2:34 PM Vijayendra Yadav 
>>> wrote:
>>>
 Hi Chesnay,

 I have the same, and I am exporting the flinkconf like below, where i
 have flink-conf.yaml with configuration you have given.What else can I try 
 ?

 export FLINK_CONF_DIR=${app_install_path}/flinkconf/

 regards,
 Vijay

 On Sun, Oct 25, 2020 at 8:03 AM Chesnay Schepler 
 wrote:

> Ah wait, in 1.11 it should not longer be necessary to explicitly copy
> the reporter jar.
>
> Please update your reporter configuration to this:
>
> metrics.reporter.grph.factory.class: 
> org.apache.flink.metrics.graphite.GraphiteReporterFactory
>
> On 10/25/2020 4:00 PM, Chesnay Schepler wrote:
>
> Have you followed the documentation, specifically this bit?
>
> > In order to use this reporter you must copy
> /opt/flink-metrics-influxdb-1.11.2.jar into the plugins/influxdb
> folder of your Flink distribution.
>
> On 10/24/2020 12:17 AM, Vijayendra Yadav wrote:
>
> Hi Team,
>
> for Flink 1.11 Graphite Metrics. I see the following Error in the log.
> Any suggestions?
>
> 020-10-23 21:55:14,652 ERROR 
> org.apache.flink.runtime.metrics.ReporterSetup- Could not 
> instantiate metrics reporter grph. Metrics might not be exposed/reported.
> java.lang.ClassNotFoundException: 
> org.apache.flink.metrics.graphite.GraphiteReporter
>   at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>   at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
>   at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>   at java.lang.Class.forName0(Native Method)
>   at java.lang.Class.forName(Class.java:264)
>   at 
> org.apache.flink.runtime.metrics.ReporterSetup.loadViaReflection(ReporterSetup.java:313)
>   at 
> org.apache.flink.runtime.metrics.ReporterSetup.loadReporter(ReporterSetup.java:274)
>   at 
> org.apache.flink.runtime.metrics.ReporterSetup.setupReporters(ReporterSetup.java:235)
>   at 
> org.apache.flink.runtime.metrics.ReporterSetup.fromConfiguration(ReporterSetup.java:148)
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:316)
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:270)
>   at 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:208)
>   at 
> 

Re: EMR Logging Woes

2020-10-27 Thread Rex Fenley
Thanks! I'll check these out.

On Tue, Oct 27, 2020 at 12:58 AM Robert Metzger  wrote:

> Hi Rex,
>
> 1. You can also use the Flink UI for retrieving logs. That usually works
> quite fast (unless your logs are huge).
>
> 2. These are the correct configuration files for setting the log level.
> Are you running on a vanilla EMR cluster, or are there modifications? The
> "problem" is that Flink on YARN adds jar files (and other files) provided
> by the environment (YARN) to its classpath. The vanilla EMR configuration
> should be fine to not interfere with Flink's logging. But maybe there are
> some changes in your environment that cause problems?
>
> Since you are SSHing into the machines already: At the top of each Flink
> log file, we are logging the location of the log4j configuration file
> (search for "-Dlog4j.configuration="). Try to open that file to verify
> what's in there.
>
> Hope this helps!
>
> Robert
>
>
> On Tue, Oct 27, 2020 at 12:03 AM Rex Fenley  wrote:
>
>> Hello,
>>
>> After lots of testing in local environments we're now trying to get our
>> cluster running on AWS EMR. We followed much of the documentation from both
>> AWS and Flink and have gotten to the point of creating a yarn session and
>> submitting jobs. We successfully get back a Job ID and in the Yarn Timeline
>> Server UI it says our application is running. However, we are having a hard
>> time with logging.
>>
>> 2 main issues:
>> 1. Logs for the jobmanager and taskmanager seem to take a long time to
>> show up or in some cases just seem to never show up in the Yarn / Hadoop
>> UI, even though we can see them just fine when ssh'ing into the cluster's
>> nodes. Anything we can do to speed this up?
>>
>> 2. We can't seem to see anything except for WARN and ERROR logs for the
>> jobmanager and taskmanager, we need at least INFO right now to confirm
>> things are working as expected. We have been jumping through hoops going
>> through a multitude of configuration files including
>> log4j-session.properties and log4j.properties setting level to DEBUG but
>> it has not helped. Are these the correct configuration files?
>>
>> Thanks!
>>
>> --
>>
>> Rex Fenley  |  Software Engineer - Mobile and Backend
>>
>>
>> Remind.com  |  BLOG 
>>  |  FOLLOW US   |  LIKE US
>> 
>>
>

-- 

Rex Fenley  |  Software Engineer - Mobile and Backend


Remind.com  |  BLOG 
 |  FOLLOW
US   |  LIKE US



Re: FLINK 1.11 Graphite Metrics

2020-10-27 Thread Chesnay Schepler
How exactly did you do the upgrade? Did you copy some files from 1.11 
into an existing 1.10 distribution?


The configuration is correct, but it appears as if the entire plugins 
directory is either a) empty or b) not shipped.


On 10/27/2020 5:22 PM, Vijayendra Yadav wrote:

Hi Robert and Chesnay,

Only  thing changed is I upgraded from Flink 1.10 to 1.11 and to 
support that updated conf yaml with factory class.


Here is attached Full Log with classpath etc.   (log.txt)

Regards,
Vijay


On Tue, Oct 27, 2020 at 9:31 AM Chesnay Schepler > wrote:


Are you writing a test? (otherwise the ReporterSetupTest reporters
wouldn't be around)
Do you have a dependency on the graphite reporter?

On 10/27/2020 8:27 AM, Robert Metzger wrote:

Hi Vijayendra,
can you post or upload the entire logs, so that we can see the
Classpath logged on startup, as well as the effective
configuration parameters?

On Tue, Oct 27, 2020 at 12:49 AM Vijayendra Yadav
mailto:contact@gmail.com>> wrote:

Hi Chesnay,

Another log message:

2020-10-26 23:33:08,516 WARN
org.apache.flink.runtime.metrics.ReporterSetup - The reporter
factory
(org.apache.flink.metrics.graphite.GraphiteReporterFactory)
could not be found for reporter grph. Available factories:

[org.apache.flink.runtime.metrics.ReporterSetupTest$ConfigExposingReporterFactory,
org.apache.flink.runtime.metrics.ReporterSetupTest$TestReporterFactory,

org.apache.flink.runtime.metrics.ReporterSetupTest$InstantiationTypeTrackingTestReporterFactory,
org.apache.flink.runtime.metrics.ReporterSetupTest$FailingFactory].
2020-10-26 23:33:08,517 INFO
org.apache.flink.runtime.metrics.MetricRegistryImpl - No
metrics reporter configured, no metrics will be exposed/reported.
Regards,
Vijay

On Mon, Oct 26, 2020 at 2:34 PM Vijayendra Yadav
mailto:contact@gmail.com>> wrote:

Hi Chesnay,

I have the same, and I am exporting the flinkconf like
below, where i have flink-conf.yaml with configuration
you have given.What else can I try ?

export FLINK_CONF_DIR=${app_install_path}/flinkconf/

regards,
Vijay

On Sun, Oct 25, 2020 at 8:03 AM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

Ah wait, in 1.11 it should not longer be necessary to
explicitly copy the reporter jar.

Please update your reporter configuration to this:

|metrics.reporter.grph.factory.class:
org.apache.flink.metrics.graphite.GraphiteReporterFactory|

On 10/25/2020 4:00 PM, Chesnay Schepler wrote:

Have you followed the documentation, specifically
this bit?

> In order to use this reporter you must copy
|/opt/flink-metrics-influxdb-1.11.2.jar| into the
|plugins/influxdb| folder of your Flink distribution.

On 10/24/2020 12:17 AM, Vijayendra Yadav wrote:

Hi Team,

for Flink 1.11 Graphite Metrics. I see the
following Error in the log.
Any suggestions?

020-10-23 21:55:14,652 ERROR 
org.apache.flink.runtime.metrics.ReporterSetup- Could not 
instantiate metrics reporter grph. Metrics might not be exposed/reported.
java.lang.ClassNotFoundException: 
org.apache.flink.metrics.graphite.GraphiteReporter
at 
java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at 
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at 
org.apache.flink.runtime.metrics.ReporterSetup.loadViaReflection(ReporterSetup.java:313)
at 
org.apache.flink.runtime.metrics.ReporterSetup.loadReporter(ReporterSetup.java:274)
at 
org.apache.flink.runtime.metrics.ReporterSetup.setupReporters(ReporterSetup.java:235)
at 
org.apache.flink.runtime.metrics.ReporterSetup.fromConfiguration(ReporterSetup.java:148)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:316)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:270)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:208)
at 

RE: Dependency vulnerabilities with flink 1.11.1 version

2020-10-27 Thread V N, Suchithra (Nokia - IN/Bangalore)
Thanks Robert.

Regards,
Suchithra

From: Robert Metzger 
Sent: Tuesday, October 27, 2020 9:10 PM
To: Till Rohrmann 
Cc: V N, Suchithra (Nokia - IN/Bangalore) ; 
user@flink.apache.org
Subject: Re: Dependency vulnerabilities with flink 1.11.1 version

FYI: For the sake of completeness, I have added some reasoning to all the JIRA 
tickets why we are not backporting fixes to the 1.11-line of Flink.

On Mon, Oct 26, 2020 at 4:51 PM Robert Metzger 
mailto:rmetz...@apache.org>> wrote:
Hey Suchithra,
thanks a lot for this report. I'm in the process of closing all the tickets 
Till has created (by pushing version upgrades to Flink).

The fixes will be released with the upcoming Flink 1.12 release.
I have decided against backporting the fixes to the 1.11 line of Flink, because 
they usually require large dependency version jumps, and none of the 
vulnerabilities reported have a confirmed case of directly affecting Flink. For 
example the issue in commons-io affects the FileNameUtils.normalize, which we 
are not using in Flink.

Best,
Robert



On Fri, Oct 23, 2020 at 10:55 AM Till Rohrmann 
mailto:trohrm...@apache.org>> wrote:
Hi Suchithra,

thanks for doing this analysis. I think we should try to upgrade the affected 
libraries. I have opened issues to do these changes [1, 2, 3, 4, 5]. In the 
future, it would be great if you could first reach out to 
priv...@flink.apache.org so that we can fix 
these problems without drawing attention to them.

[1] https://issues.apache.org/jira/browse/FLINK-19781
[2] https://issues.apache.org/jira/browse/FLINK-19782
[3] https://issues.apache.org/jira/browse/FLINK-19783
[4] https://issues.apache.org/jira/browse/FLINK-19784
[5] https://issues.apache.org/jira/browse/FLINK-19785

Cheers,
Till

On Thu, Oct 22, 2020 at 12:56 PM V N, Suchithra (Nokia - IN/Bangalore) 
mailto:suchithra@nokia.com>> wrote:

Hello,

We are using Apache Flink 1.11.1 version. During our security scans following 
issues are reported by our scan tool.

1.Package : commons_codec-1.10
Severity: Medium

Description:
Apache Commons contains a flaw that is due to the Base32 codec decoding invalid 
strings instead of rejecting them. This may allow a remote attacker to tunnel 
additional information via a base 32 string that seems valid.

Path:
/opt/flink/lib/flink-table_2.11-1.11.1.jar:commons-codec
/opt/flink/lib/flink-table-blink_2.11-1.11.1.jar:commons-codec

References:
https://issues.apache.org/jira/browse/CODEC-134
https://issues.apache.org/jira/browse/HTTPCLIENT-2018

2. Package : antlr-4.7
Severity: Medium

Description:
ANTLR contains a flaw in 
runtime/Java/src/org/antlr/v4/runtime/atn/ParserATNSimulator.java that is 
triggered as it does not catch exceptions when attempting to access the 
TURN_OFF_LR_LOOP_ENTRY_BRANCH_OPT environment variable. This may allow a 
context-dependent attacker to potentially crash a process linked against the 
library.

Path:
/opt/flink/opt/flink-python_2.11-1.11.1.jar:antlr4-runtime
References:
https://github.com/antlr/antlr4/issues/2069

3. Package : mesos-1.0.1
Severity: Medium

Description:
Apache Mesos can be configured to require authentication to call the Executor 
HTTP API using JSON Web Token (JWT). In Apache Mesos versions pre-1.4.2, 1.5.0, 
1.5.1, 1.6.0 the comparison of the generated HMAC value against the provided 
signature in the JWT implementation used is vulnerable to a timing attack 
because instead of a constant-time string comparison routine a standard `==` 
operator has been used. A malicious actor can therefore abuse the timing 
difference of when the JWT validation function returns to reveal the correct 
HMAC value.
Path:
/opt/flink/lib/flink-dist_2.11-1.11.1.jar:mesos

References:
https://nvd.nist.gov/vuln/detail/CVE-2018-8023

4. Package : okhttp-3.7.0
Severity: Medium

Description:
** DISPUTED ** CertificatePinner.java in OkHttp 3.x through 3.12.0 allows 
man-in-the-middle attackers to bypass certificate pinning by changing 
SSLContext and the boolean values while hooking the application. NOTE: This id 
is disputed because some parties don't consider this is a vulnerability. Their 
rationale can be found in https://github.com/square/okhttp/issues/4967.
Path:
/opt/flink/plugins/metrics-datadog/flink-metrics-datadog-1.11.1.jar:okhttp
References:
https://nvd.nist.gov/vuln/detail/CVE-2018-20200

5. Package : commons_io-2.4
Severity: Medium

Description:
Apache Commons IO contains a flaw that allows traversing outside of a 
restricted path. The issue is due to FileNameUtils.normalize not properly 
sanitizing user input, specifically path traversal style attacks (e.g. '../'). 
With a specially crafted request, a remote attacker can disclose arbitrary 
files.
Path:
/opt/flink/lib/flink-dist_2.11-1.11.1.jar:commons-io
/opt/flink/lib/flink-table-blink_2.11-1.11.1.jar:commons-io

References:
https://issues.apache.org/jira/browse/IO-556


Please let us know your comments on these issues and fix plans.

Regards,
Suchithra


Re: Dependency vulnerabilities with flink 1.11.1 version

2020-10-27 Thread Robert Metzger
FYI: For the sake of completeness, I have added some reasoning to all the
JIRA tickets why we are not backporting fixes to the 1.11-line of Flink.

On Mon, Oct 26, 2020 at 4:51 PM Robert Metzger  wrote:

> Hey Suchithra,
> thanks a lot for this report. I'm in the process of closing all the
> tickets Till has created (by pushing version upgrades to Flink).
>
> The fixes will be released with the upcoming Flink 1.12 release.
> I have decided against backporting the fixes to the 1.11 line of Flink,
> because they usually require large dependency version jumps, and none of
> the vulnerabilities reported have a confirmed case of directly affecting
> Flink. For example the issue in commons-io affects the
> FileNameUtils.normalize, which we are not using in Flink.
>
> Best,
> Robert
>
>
>
> On Fri, Oct 23, 2020 at 10:55 AM Till Rohrmann 
> wrote:
>
>> Hi Suchithra,
>>
>> thanks for doing this analysis. I think we should try to upgrade the
>> affected libraries. I have opened issues to do these changes [1, 2, 3, 4,
>> 5]. In the future, it would be great if you could first reach out to
>> priv...@flink.apache.org so that we can fix these problems without
>> drawing attention to them.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-19781
>> [2] https://issues.apache.org/jira/browse/FLINK-19782
>> [3] https://issues.apache.org/jira/browse/FLINK-19783
>> [4] https://issues.apache.org/jira/browse/FLINK-19784
>> [5] https://issues.apache.org/jira/browse/FLINK-19785
>>
>> Cheers,
>> Till
>>
>> On Thu, Oct 22, 2020 at 12:56 PM V N, Suchithra (Nokia - IN/Bangalore) <
>> suchithra@nokia.com> wrote:
>>
>>>
>>>
>>> Hello,
>>>
>>>
>>>
>>> We are using Apache Flink 1.11.1 version. During our security scans
>>> following issues are reported by our scan tool.
>>>
>>>
>>>
>>> *1.Package : commons_codec-1.10*
>>>
>>> *Severity: Medium*
>>>
>>>
>>>
>>> *Description: *
>>>
>>> Apache Commons contains a flaw that is due to the Base32 codec decoding
>>> invalid strings instead of rejecting them. This may allow a remote attacker
>>> to tunnel additional information via a base 32 string that seems valid.
>>>
>>>
>>>
>>> *Path:*
>>>
>>> /opt/flink/lib/flink-table_2.11-1.11.1.jar:commons-codec
>>>
>>> /opt/flink/lib/flink-table-blink_2.11-1.11.1.jar:commons-codec
>>>
>>>
>>>
>>> *References:*
>>>
>>> https://issues.apache.org/jira/browse/CODEC-134
>>>
>>> https://issues.apache.org/jira/browse/HTTPCLIENT-2018
>>>
>>>
>>>
>>> *2. Package : antlr-4.7*
>>>
>>> *Severity: Medium*
>>>
>>>
>>>
>>> *Description: *
>>>
>>> ANTLR contains a flaw in
>>> runtime/Java/src/org/antlr/v4/runtime/atn/ParserATNSimulator.java that is
>>> triggered as it does not catch exceptions when attempting to access the
>>> TURN_OFF_LR_LOOP_ENTRY_BRANCH_OPT environment variable. This may allow a
>>> context-dependent attacker to potentially crash a process linked against
>>> the library.
>>>
>>>
>>>
>>> *Path:*
>>>
>>> /opt/flink/opt/flink-python_2.11-1.11.1.jar:antlr4-runtime
>>>
>>> *References:*
>>>
>>> https://github.com/antlr/antlr4/issues/2069
>>>
>>>
>>>
>>> *3. Package : mesos-1.0.1*
>>>
>>> *Severity: Medium*
>>>
>>>
>>>
>>> *Description: *
>>>
>>> Apache Mesos can be configured to require authentication to call the
>>> Executor HTTP API using JSON Web Token (JWT). In Apache Mesos versions
>>> pre-1.4.2, 1.5.0, 1.5.1, 1.6.0 the comparison of the generated HMAC value
>>> against the provided signature in the JWT implementation used is vulnerable
>>> to a timing attack because instead of a constant-time string comparison
>>> routine a standard `==` operator has been used. A malicious actor can
>>> therefore abuse the timing difference of when the JWT validation function
>>> returns to reveal the correct HMAC value.
>>>
>>> *Path:*
>>>
>>> /opt/flink/lib/flink-dist_2.11-1.11.1.jar:mesos
>>>
>>>
>>>
>>> *References:*
>>>
>>> https://nvd.nist.gov/vuln/detail/CVE-2018-8023
>>>
>>>
>>>
>>> *4. Package : okhttp-3.7.0*
>>>
>>> *Severity: Medium*
>>>
>>>
>>>
>>> *Description: *
>>>
>>> ** DISPUTED ** CertificatePinner.java in OkHttp 3.x through 3.12.0
>>> allows man-in-the-middle attackers to bypass certificate pinning by
>>> changing SSLContext and the boolean values while hooking the application.
>>> NOTE: This id is disputed because some parties don't consider this is a
>>> vulnerability. Their rationale can be found in
>>> https://github.com/square/okhttp/issues/4967.
>>>
>>> *Path:*
>>>
>>>
>>> /opt/flink/plugins/metrics-datadog/flink-metrics-datadog-1.11.1.jar:okhttp
>>>
>>> *References:*
>>>
>>> https://nvd.nist.gov/vuln/detail/CVE-2018-20200
>>>
>>>
>>>
>>> *5. Package : commons_io-2.4*
>>>
>>> *Severity: Medium*
>>>
>>>
>>>
>>> *Description: *
>>>
>>> Apache Commons IO contains a flaw that allows traversing outside of a
>>> restricted path. The issue is due to FileNameUtils.normalize not properly
>>> sanitizing user input, specifically path traversal style attacks (e.g.
>>> '../'). With a specially crafted request, a remote attacker can disclose
>>> 

Re: Flink checkpointing state

2020-10-27 Thread Boris Lublinsky
Thanks Yun,
This refers to Flip144 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-144%3A+Native+Kubernetes+HA+for+Flink
 

Flip contains 2 parts - leader election and HA information persistence and 
offers two options.
Can you tell us what exactly will be part of 1.12. 
We would be happy with second option for now, if its faster to implement.
 

> On Oct 27, 2020, at 1:11 AM, Yun Tang  wrote:
> 
> Hi Boris
> 
> Please refer to FLINK-12884[1] for current progress of native HA support of 
> k8s which targets for release-1.12.
> 
> [1] https://issues.apache.org/jira/browse/FLINK-12884 
> 
> 
> Best
> Yun Tang
> 
> From: Boris Lublinsky 
> Sent: Tuesday, October 27, 2020 2:56
> To: user 
> Subject: Flink checkpointing state
>  
> This is from Flink 1.8:
> 
> "Job Manager keeps some state related to checkpointing in it’s memory. This 
> state would be lost on Job Manager crashes, which is why this state is 
> persisted in ZooKeeper. This means that even though there is no real need for 
> the leader election and -discovery part of Flink’s HA mode (as is this 
> handled natively by Kubernetes), it still needs to be enabled just for 
> storing the checkpoint state.”
> 
> Was it ever fixed in Flink 1.10 or 1.11? If running Flink on K8, without HA, 
> there is no Zookeeper. And if the above is still the case, then checkpointing 
> will never pick up the right one



Re: How to understand NOW() in SQL when using Table & SQL API to develop a streaming app?

2020-10-27 Thread Till Rohrmann
Thanks for the clarification. This improvement would be helpful, I believe.

Cheers,
Till

On Tue, Oct 27, 2020 at 1:19 PM Jark Wu  wrote:

> Hi Till,
>
> The documentation mentions that "this function is not deterministic" where
> the "not deterministic" means the value of this function is not
> deterministic for every record.
> However, this is not very clear for users. I think we can improve the
> documentation.
>
> Best,
> Jark
>
> On Tue, 27 Oct 2020 at 15:59, Till Rohrmann  wrote:
>
>> Quick question Jark: Is this difference in behaviour documented? I
>> couldn't find it in the docs.
>>
>> Cheers,
>> Till
>>
>> On Tue, Oct 27, 2020 at 7:30 AM Jark Wu  wrote:
>>
>>> Hi Longdexin,
>>>
>>> In traditional batch sql, NOW() is executed and determined before the
>>> job is submitted and will not change for every processed record.
>>> However, this doesn't make much sense in streaming sql, therefore, NOW()
>>> function in Flink is executed for every record.
>>>
>>> Best,
>>> Jark
>>>
>>> On Fri, 23 Oct 2020 at 16:30, Till Rohrmann 
>>> wrote:
>>>
 Hi Longdexin,

 thanks for reaching out to the Flink community. I am pulling in Jark
 who might be able to help you with this question.

 Cheers,
 Till

 On Thu, Oct 22, 2020 at 2:56 PM Longdexin <274522...@qq.com> wrote:

> From my point of view, the value of NOW() function in SQL is certain
> by the
> time when the streaming app is launched and will not change with the
> process
> time. However, as a new Flink user, I'm not so sure of that. By the
> way, if
> my attemp is to keep the time logic to update all the time, what
> should I
> do?
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>



[ANNOUNCE] Weekly Community Update 2020/39-43

2020-10-27 Thread Konstantin Knauf
Dear community,

happy to finally share another community update with you. With Flink
Forward and the release of Flink SQL on Ververica Platform I was a bit too
busy the previous weeks. I'll try to return to the weekly cadence going
forward again. This time we have a lot of ground to cover. With the feature
freeze for Flink 1.12 around the corner, this includes some last FLIPs for
this release as well as the first FLIPs that will go into Flink 1.13.

Flink Development
==

* [releases] It looks like the feature freeze for Flink 1.12 will be set to
Monday, 2nd November. [1]

* [releases] Stateful Function 2.2.0 was released. [2] Please check out the
release blog post [3] for an overview of the new features.

* [releases] Gordon started a discussion on the release of Flink 1.11.3.
Gordon and Xintong will be release managers. Looks like this will happen
soonish. [4]

* [releases] flink-shaded 12.0 was released. [5]

* [sql] Jark has started the discussion on FLIP-145 to support windowing
table-valued functions. This FLIP introduces a new way to do windowing in
Flink SQL, which would eventually supersede the current grouping windows.
It adds four new windowing table-valued functions for TUMBLE, HOP, CUMULATE
and SESSION windows. These functions return a new table with the same
schema as the original table + a "window_start" and "window_end" column.
For every window that a row falls into, one row is added to the output
table. This windowing API provides a better integration with other parts of
Flink SQL and is hopefully easier to pick up for SQL users than the current
syntax. [6]

* [sql] Godfrey has shared a design document to use the multiple-input
stream operator added in FLIP-92 to replace forward shuffles in Flink SQL
by local function calls. [7]

* [sql] [connectors] Jingsong started a discussion and subsequent vote on
FLIP-146. FLIP-146 extends the new TableSource/TableSink interfaces so that
all existing table sources and sinks can be migrated to the new interfaces.
Specifically, it adds the option to create a Table Source/Sink from a
DataStream. [8]

* [sql] [connectors] Shengkai started the discussion and subsequently
the vote on FLIP-149 to support reading/writing Kafka topics in upsert mode
with Flink SQL (aka "Support for Compacted Kafka Topics"). With this FLIP
users will be able to write changelog streams to (presumably compacted)
Kafka topics and Kafka topics can be interpreted as changelog streams
(regular record=upsert, record with null value=delete). [9]

* [deployment] Robert started a discussion on dropping Mesos support in one
of the upcoming releases (earliest Flink 1.13). A lot of support, but also
concerns by current users. So, please if you are Flink on Mesos, let the
community know, so that we can make an informed decision. [10]

* [connectors] Dominik Wosinksi has started a discussion on a MongoDB
(streaming) connector for Apache Flink. There have been tickets/PRs in
Flink and Bahir before and he would like to come to some kind of
conclusion. [11]

* [connectors] Darion Yaphet has asked if it were possible to provide a
connector for the Open Source distributed graph database Nebula Graph. [12]

* [connectors] Kostas proposes to remove the long time deprecated
flink-connector-filesystem. No conclusion yet. [13]

* [runtime] Yingjie Cao proposed to add a sort-merge-based blocking shuffle
to Apache Flink in addition to the hash-style blocking shuffle
implementation that Flink currently uses. The vote is currently ongoing.
[14]

* [docker] Yun Tang started a discussion on using jemalloc as memory
allocator for debian based Flink Docker images. The conclusion seems to be
to make this configurable via an environment variable and to keep glibc
malloc as the default. [15]

* [development process] Aljoscha proposes to use an automatic
code-formatted like spotless going forward. Generally, a lot of support to
use a code-formatted and no strong opinions on the actual code style to be
used. There is some discussion on whether to do this incrementally or all
at once. [16]

[1]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Release-1-12-Feature-Freeze-tp45814.html
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-Flink-Stateful-Functions-2-2-0-released-tp45304.html
[3] https://flink.apache.org/news/2020/09/28/release-statefun-2.2.0.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Releasing-Apache-Flink-1-11-3-tp45989.html
[5]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Apache-flink-shaded-12-0-released-tp45453.html
[6]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-145-Support-SQL-windowing-table-valued-function-tp45269p45270.html
[7]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Support-Multiple-Input-for-Blink-Planner-tp45674.html
[8]

Re: flink1.11日志上报

2020-10-27 Thread zhisheng
弱弱的问一下,你们集群作业数量大概多少?因为用户可能打印原始数据在日志里面,这个数据量确实还是很大的,全部将日志打到 ES 每月需要多少成本啊?

Storm☀️  于2020年10月27日周二 下午8:37写道:

> 我们也是用的kafkaappender进行日志上报,然后在ES中提供日志检索
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink1.11日志上报

2020-10-27 Thread Storm☀️
我们也是用的kafkaappender进行日志上报,然后在ES中提供日志检索



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: How to understand NOW() in SQL when using Table & SQL API to develop a streaming app?

2020-10-27 Thread Jark Wu
Hi Till,

The documentation mentions that "this function is not deterministic" where
the "not deterministic" means the value of this function is not
deterministic for every record.
However, this is not very clear for users. I think we can improve the
documentation.

Best,
Jark

On Tue, 27 Oct 2020 at 15:59, Till Rohrmann  wrote:

> Quick question Jark: Is this difference in behaviour documented? I
> couldn't find it in the docs.
>
> Cheers,
> Till
>
> On Tue, Oct 27, 2020 at 7:30 AM Jark Wu  wrote:
>
>> Hi Longdexin,
>>
>> In traditional batch sql, NOW() is executed and determined before the job
>> is submitted and will not change for every processed record.
>> However, this doesn't make much sense in streaming sql, therefore, NOW()
>> function in Flink is executed for every record.
>>
>> Best,
>> Jark
>>
>> On Fri, 23 Oct 2020 at 16:30, Till Rohrmann  wrote:
>>
>>> Hi Longdexin,
>>>
>>> thanks for reaching out to the Flink community. I am pulling in Jark who
>>> might be able to help you with this question.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Oct 22, 2020 at 2:56 PM Longdexin <274522...@qq.com> wrote:
>>>
 From my point of view, the value of NOW() function in SQL is certain by
 the
 time when the streaming app is launched and will not change with the
 process
 time. However, as a new Flink user, I'm not so sure of that. By the
 way, if
 my attemp is to keep the time logic to update all the time, what should
 I
 do?



 --
 Sent from:
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

>>>


请问批处理有反压嘛?

2020-10-27 Thread 请叫我雷锋
如题

LocalBufferPoo????

2020-10-27 Thread 1548069580

??jstack??source??
"Legacy Source Thread - Source: Custom Source (1/2)" #95 prio=5 os_prio=0 
tid=0x7fafa4018000 nid=0x57d waiting on condition [0x7fb03d48a000]
 java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00074afaf508 (a 
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:241)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:210)
at 
org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:151)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:122)
at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
- locked <0x00074a5f6a98 (a java.lang.Object)
at 
com.jd.bdp.flink.sink.jimdb.common.SourceTimeMillisMock.run(SourceTimeMillisMock.java:25)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:200)


Memory Segmentflink uiMemory Segments 
Available

Re: sql-client 连接hive报错 TTransportException

2020-10-27 Thread Rui Li
你好,我看log里连接的是1端口,这个是HS2的端口吧?Flink的HiveCatalog需要连接的是HMS,可以启动一个HMS再试试哈。

On Tue, Oct 27, 2020 at 9:57 AM RS  wrote:

> Hi, 请教下
> 我尝试使用sql-client连接hive,  hive正常, 使用beeline -u jdbc:hive2://x.x.x.x:1
> 可以正常连接
>
>
> sql-client-defaults.yaml配置内容:
> tables: []
> functions: []
> catalogs:
> - name: myhive
>   type: hive
>   hive-conf-dir: /home/hive/flink-1.11.1/conf
>   default-database: default
> execution:
>   planner: blink
>   type: streaming
>   time-characteristic: event-time
>   periodic-watermarks-interval: 200
>   result-mode: table
>   max-table-result-rows: 100
>   parallelism: 1
>   max-parallelism: 128
>   min-idle-state-retention: 0
>   max-idle-state-retention: 0
>   restart-strategy:
> type: fallback
> deployment:
>   response-timeout: 5000
>   gateway-address: ""
>   gateway-port: 0
>
>
> 然后启动sql-client报错
> $./bin/sql-client.sh embedded
>
>
> 最后的报错信息:
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Unexpected exception.
> This is a bug. Please consider filing an issue.
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
> Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
> Could not create execution context.
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:870)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227)
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
> Caused by: org.apache.flink.table.catalog.exceptions.CatalogException:
> Failed to determine whether database default exists or not
> at
> org.apache.flink.table.catalog.hive.HiveCatalog.databaseExists(HiveCatalog.java:335)
> at
> org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:227)
> at
> org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:191)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.registerCatalog(TableEnvironmentImpl.java:337)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$null$5(ExecutionContext.java:627)
> at java.util.HashMap.forEach(HashMap.java:1289)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:625)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:264)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:624)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:523)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:183)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:136)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:859)
> ... 3 more
> Caused by: org.apache.thrift.transport.TTransportException
> at
> org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:132)
> at org.apache.thrift.transport.TTransport.readAll(TTransport.java:86)
> at
> org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:429)
> at
> org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:318)
> at
> org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:219)
> at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77)
> at
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_database(ThriftHiveMetastore.java:1135)
> at
> org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_database(ThriftHiveMetastore.java:1122)
> at
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getDatabase(HiveMetaStoreClient.java:1511)
> at
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getDatabase(HiveMetaStoreClient.java:1506)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:208)
> at com.sun.proxy.$Proxy28.getDatabase(Unknown Source)
> at
> org.apache.flink.table.catalog.hive.client.HiveMetastoreClientWrapper.getDatabase(HiveMetastoreClientWrapper.java:107)
> at
> org.apache.flink.table.catalog.hive.HiveCatalog.databaseExists(HiveCatalog.java:330)
> ... 15 more
>
>
>
>
> 附录完整错误信息:
> Searching for
> '/home/hive/flink-1.11.1/conf/sql-client-defaults.yaml'...found.
> Reading default environment from:
> file:/home/hive/flink-1.11.1/conf/sql-client-defaults.yaml
> No session environment specified.
> 2020-10-27 09:48:14,533 INFO  

Fwd: Flink memory usage monitoring

2020-10-27 Thread Matthias Pohl
I missed adding the mailing list in my previous email.

-- Forwarded message -
From: Matthias Pohl 
Date: Tue, Oct 27, 2020 at 12:39 PM
Subject: Re: Flink memory usage monitoring
To: Rajesh Payyappilly Jose 


Hi Rajesh,
thanks for reaching out to us. We worked on providing metrics for managed
memory and network memory as part of FLIP-102 [1]. It looks like these
features are going to be added to the upcoming release of Flink 1.12.

We decided to not include off-heap memory as it is not necessarily under
control of Flink (e.g. user code can allocate native memory and Flink
wouldn't be aware of it). Hence, providing numbers for off-heap memory
usage might be misleading. There will be a metric to monitor the Metaspace
usage, though.

Best,
Matthias

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-102%3A+Add+More+Metrics+to+TaskManager

On Tue, Oct 20, 2020 at 8:23 PM Rajesh Payyappilly Jose 
wrote:

> Classification: *Internal*
>
> Hi,
>
>
>
> Environment - Flink 1.11 on K8s
>
>
>
> Is there a way to monitor the usage of managed memory, off-heap memory and
> network memory?
>
>
>
> -Rajesh
>
>
> ::DISCLAIMER::
> --
> The contents of this e-mail and any attachment(s) are confidential and
> intended for the named recipient(s) only. E-mail transmission is not
> guaranteed to be secure or error-free as information could be intercepted,
> corrupted, lost, destroyed, arrive late or incomplete, or may contain
> viruses in transmission. The e mail and its contents (with or without
> referred errors) shall therefore not attach any liability on the originator
> or HCL or its affiliates. Views or opinions, if any, presented in this
> email are solely those of the author and may not necessarily reflect the
> views or opinions of HCL or its affiliates. Any form of reproduction,
> dissemination, copying, disclosure, modification, distribution and / or
> publication of this message without the prior written consent of authorized
> representative of HCL is strictly prohibited. If you have received this
> email in error please delete it and notify the sender immediately. Before
> opening any email and/or attachments, please check them for viruses and
> other defects.
>


Re: [BULK]Re: [SURVEY] Remove Mesos support

2020-10-27 Thread Oleksandr Nitavskyi
Hello Xintong,

Thanks for the insights and support.

Browsing the Mesos backlog and didn't identify anything critical, which is left 
there.

I see that there are were quite a lot of contributions to the Flink Mesos in 
the recent version: https://github.com/apache/flink/commits/master/flink-mesos.
We plan to validate the current Flink master (or release 1.12 branch) our Mesos 
setup. In case of any issues, we will try to propose changes.
My feeling is that our test results shouldn't affect the Flink 1.12 release 
cycle. And if any potential commits will land into the 1.12.1 it should be 
totally fine.

In the future, we would be glad to help you guys with any maintenance-related 
questions. One of the highest priorities around this component seems to be the 
development of the full e2e test.

Kind Regards
Oleksandr Nitavskyi

From: Xintong Song 
Sent: Tuesday, October 27, 2020 7:14 AM
To: dev ; user 
Cc: Piyush Narang 
Subject: [BULK]Re: [SURVEY] Remove Mesos support

Hi Piyush,

Thanks a lot for sharing the information. It would be a great relief that you 
are good with Flink on Mesos as is.

As for the jira issues, I believe the most essential ones should have already 
been resolved. You may find some remaining open issues here [1], but not all of 
them are necessary if we decide to keep Flink on Mesos as is.

At the moment and in the short future, I think helps are mostly needed on 
testing the upcoming release 1.12 with Mesos use cases. The community is 
currently actively preparing the new release, and hopefully we could come up 
with a release candidate early next month. It would be greatly appreciated if 
you fork as experienced Flink on Mesos users can help with verifying the 
release candidates.


Thank you~

Xintong Song

[1] 
https://issues.apache.org/jira/browse/FLINK-17402?jql=project%20%3D%20FLINK%20AND%20component%20%3D%20%22Deployment%20%2F%20Mesos%22%20AND%20status%20%3D%20Open

On Tue, Oct 27, 2020 at 2:58 AM Piyush Narang 
mailto:p.nar...@criteo.com>> wrote:

Hi Xintong,



Do you have any jiras that cover any of the items on 1 or 2? I can reach out to 
folks internally and see if I can get some folks to commit to helping out.



To cover the other qs:

  *   Yes, we’ve not got a plan at the moment to get off Mesos. We use Yarn for 
some our Flink workloads when we can. Mesos is only used when we need streaming 
capabilities in our WW dcs (as our Yarn is centralized in one DC)
  *   We’re currently on Flink 1.9 (old planner). We have a plan to bump to 
1.11 / 1.12 this quarter.
  *   We typically upgrade once every 6 months to a year (not every release). 
We’d like to speed up the cadence but we’re not there yet.
  *   We’d largely be good with keeping Flink on Mesos as-is and functional 
while missing out on some of the newer features. We understand the pain on the 
communities side and we can take on the work if we see some fancy improvement 
in Flink on Yarn / K8s that we want in Mesos to put in the request to port it 
over.



Thanks,



-- Piyush





From: Xintong Song mailto:tonysong...@gmail.com>>
Date: Sunday, October 25, 2020 at 10:57 PM
To: dev mailto:d...@flink.apache.org>>, user 
mailto:user@flink.apache.org>>
Cc: Lasse Nedergaard 
mailto:lassenedergaardfl...@gmail.com>>, 
mailto:p.nar...@criteo.com>>
Subject: Re: [SURVEY] Remove Mesos support



Thanks for sharing the information with us, Piyush an Lasse.



@Piyush



Thanks for offering the help. IMO, there are currently several problems that 
make supporting Flink on Mesos challenging for us.

  1.  Lack of Mesos experts. AFAIK, there are very few people (if not none) 
among the active contributors in this community that are familiar with Mesos 
and can help with development on this component.
  2.  Absence of tests. Mesos does not provide a testing cluster, like 
`MiniYARNCluster`, making it hard to test interactions between Flink and Mesos. 
We have only a few very simple e2e tests running on Mesos deployed in a docker, 
covering the most fundamental workflows. We are not sure how well those tests 
work, especially against some potential corner cases.
  3.  Divergence from other deployment. Because of 1 and 2, the new efforts 
(features, maintenance, refactors) tend to exclude Mesos if possible. When the 
new efforts have to touch the Mesos related components (e.g., changes to the 
common resource manager interfaces), we have to be very careful and 

??????LocalBufferPoo????

2020-10-27 Thread ??????
memory 
segment??channel??

1.
2.keybykey??key??keybykeyby??


| |
??
|
|
??xiongyun...@163.com
|

??  

??2020??10??27?? 18:50??1548069580 ??

??jstack??source??
"Legacy Source Thread - Source: Custom Source (1/2)" #95 prio=5 os_prio=0 
tid=0x7fafa4018000 nid=0x57d waiting on condition [0x7fb03d48a000]
 java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for <0x00074afaf508 (a 
java.util.concurrent.CompletableFuture$Signaller)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
 at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
 at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
 at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:241)
 at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:210)
 at 
org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
 at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
 at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:151)
 at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:122)
 at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
 at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
 at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
 - locked <0x00074a5f6a98 (a java.lang.Object)
 at 
com.jd.bdp.flink.sink.jimdb.common.SourceTimeMillisMock.run(SourceTimeMillisMock.java:25)
 at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
 at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
 at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:200)


Memory Segmentflink uiMemory Segments 
Available

Re: pyflink读取csv源表时,如何跳过标题行?如何选取特定的列?

2020-10-27 Thread Xingbo Huang
Hi,
1. CsvTableSource的构造方法里面有参数ignore_first_line帮你跳过首行的标题,你可以查看一下。
2.
只想读取那四列应该没办法,主要在于你那几个列不是头部的几个列,比如10列的数据,你要前四列,那是可以的,因为正常读一行数据进来,我解析完前四列就行了,剩下可以不解析,可是要是你的列是1,3,5,7,9这样的,你不指定2,4,6,8列的类型,根本没法帮你把一行的数据给解析出来。

Best,
XIngbo

洗你的头 <1264386...@qq.com> 于2020年10月27日周二 下午2:36写道:

> 尊敬的开发者您好,
> 我读取csv数据的源代码如下:
> t_env.register_table_source("mySource",
>
> CsvTableSource(r'data\trip\yellow_tripdata_2014-01.csv',
>
> ['vendor_id','pickup_datetime','dropoff_datetime','passenger_count',
>
> 'trip_distance','pickup_longitude','pickup_latitude','rate_code',
>
> 'store_and_fwd_flag','dropoff_longitude','dropoff_latitude',
>
> 'payment_type','fare_amount','surcharge','mta_tax','tip_amount',
>
> 'tolls_amount','total_amount'],
>
> [DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.BIGINT(),
>
> DataTypes.FLOAT(),DataTypes.FLOAT(),DataTypes.FLOAT(),DataTypes.BIGINT(),
>
> DataTypes.STRING(),DataTypes.FLOAT(),DataTypes.FLOAT(),
>
> DataTypes.STRING(),DataTypes.FLOAT(),DataTypes.FLOAT(),DataTypes.FLOAT(),DataTypes.FLOAT(),
>
> DataTypes.FLOAT(),DataTypes.FLOAT()])
> )
>
> 我这里使用的CsvTableSource的方法,该如何跳过原数据中的标题行呢?同时我只想读取'pickup_longitude','pickup_latitude','dropoff_longitude','dropoff_latitude'这四列,该如何操作?
> 该种方法与 connect的OldCsv和Schema方法有什么区别?如果使用connect的方法应该怎样跳过标题行,并选取特定的列呢?
> 还是说只能在保存原数据表的时候去掉标题行?
> 期待您的解答。


Re: Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-27 Thread Xingbo Huang
Hi,

Pyflink 1.11还不支持datastream,1.12才有

Best,
Xingbo

whh_960101  于2020年10月27日周二 下午2:58写道:

> 有没有其他方式可以写入username和password,我了解java
> flink访问elasticsearch是有username和password入口的,pyflink是调用java来执行,应该是有这个入口的吧,有没有大佬可以指点一下,谢谢啦!
>
>
>
>
>
>
>
> 在 2020-10-22 16:34:56,"Yangze Guo"  写道:
> >1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
> >
> >[1] https://issues.apache.org/jira/browse/FLINK-18361
> >
> >Best,
> >Yangze Guo
> >
> >On Thu, Oct 22, 2020 at 3:47 PM whh_960101  wrote:
> >>
> >> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch
> connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://
> ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE
> TABLE myUserTable (
> >>   user_id STRING,
> >>   user_name STRING
> >>   uv BIGINT,
> >>   pv BIGINT,
> >>   PRIMARY KEY (user_id) NOT ENFORCED
> >> ) WITH (
> >>   'connector' = 'elasticsearch-7',
> >>   'hosts' = 'http://localhost:9200',
> >>   'index' = 'users'
> >> );Connector Options
> >> | Option | Required | Default | Type | Description |
> >> |
> >> connector
> >> | required | (none) | String | Specify what connector to use, valid
> values are:
> >> elasticsearch-6: connect to Elasticsearch 6.x cluster
> >> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
> >> |
> >> |
> >> hosts
> >> | required | (none) | String | One or more Elasticsearch hosts to
> connect to, e.g. 'http://host_name:9092;http://host_name:9093'. |
> >> |
> >> index
> >> | required | (none) | String | Elasticsearch index for every record.
> Can be a static index (e.g. 'myIndex') or a dynamic index (e.g.
> 'index-{log_ts|-MM-dd}'). See the following Dynamic Indexsection for
> more details. |
> >> |
> >> document-type
> >> | required in 6.x | (none) | String | Elasticsearch document type. Not
> necessary anymore in elasticsearch-7. |
> >> |
> >> document-id.key-delimiter
> >> | optional | _ | String | Delimiter for composite keys ("_" by
> default), e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
> >> |
> >> failure-handler
> >> | optional | fail | String | Failure handling strategy in case a
> request to Elasticsearch fails. Valid strategies are:
> >> fail: throws an exception if a request fails and thus causes a job
> failure.
> >> ignore: ignores failures and drops the request.
> >> retry_rejected: re-adds requests that have failed due to queue capacity
> saturation.
> >> custom class name: for failure handling with a
> ActionRequestFailureHandler subclass.
> >> |
> >> |
> >> sink.flush-on-checkpoint
> >> | optional | true | Boolean | Flush on checkpoint or not. When
> disabled, a sink will not wait for all pending action requests to be
> acknowledged by Elasticsearch on checkpoints. Thus, a sink does NOT provide
> any strong guarantees for at-least-once delivery of action requests. |
> >> |
> >> sink.bulk-flush.max-actions
> >> | optional | 1000 | Integer | Maximum number of buffered actions per
> bulk request. Can be set to '0' to disable it. |
> >> |
> >> sink.bulk-flush.max-size
> >> | optional | 2mb | MemorySize | Maximum size in memory of buffered
> actions per bulk request. Must be in MB granularity. Can be set to '0' to
> disable it. |
> >> |
> >> sink.bulk-flush.interval
> >> | optional | 1s | Duration | The interval to flush buffered actions.
> Can be set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and
> 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set
> allowing for complete async processing of buffered actions. |
> >> |
> >> sink.bulk-flush.backoff.strategy
> >> | optional | DISABLED | String | Specify how to perform retries if any
> flush actions failed due to a temporary request error. Valid strategies are:
> >> DISABLED: no retry performed, i.e. fail after the first request error.
> >> CONSTANT: wait for backoff delay between retries.
> >> EXPONENTIAL: initially wait for backoff delay and increase
> exponentially between retries.
> >> |
> >> |
> >> sink.bulk-flush.backoff.max-retries
> >> | optional | 8 | Integer | Maximum number of backoff retries. |
> >> |
> >> sink.bulk-flush.backoff.delay
> >> | optional | 50ms | Duration | Delay between each backoff attempt. For
> CONSTANT backoff, this is simply the delay between each retry. For
> EXPONENTIAL backoff, this is the initial base delay. |
> >> |
> >> connection.max-retry-timeout
> >> | optional | (none) | Duration | Maximum timeout between retries. |
> >> |
> >> connection.path-prefix
> >> | optional | (none) | String | Prefix string to be added to every REST
> communication, e.g., '/v1' |
> >> |
> >> format
> >> | optional | json | String | Elasticsearch connector supports to
> specify a format. The format must produce a valid json document. By default
> uses built-in 'json' format. Please refer to JSON Format page for more
> details. |
> >>
> >>
> >>
> >>
> >>
> >>
> >>
>
>
>
>
>
>


LocalBufferPoo????

2020-10-27 Thread 1548069580

??jstack??source??
"Legacy Source Thread - Source: Custom Source (1/2)" #95 prio=5 os_prio=0 
tid=0x7fafa4018000 nid=0x57d waiting on condition [0x7fb03d48a000]
 java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00074afaf508 (a 
java.util.concurrent.CompletableFuture$Signaller)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1693)
at 
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
at 
java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1729)
at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:241)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:210)
at 
org.apache.flink.runtime.io.network.partition.ResultPartition.getBufferBuilder(ResultPartition.java:189)
at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.requestNewBufferBuilder(ChannelSelectorRecordWriter.java:103)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:151)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:122)
at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
- locked <0x00074a5f6a98 (a java.lang.Object)
at 
com.jd.bdp.flink.sink.jimdb.common.SourceTimeMillisMock.run(SourceTimeMillisMock.java:25)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:200)


Memory Segmentflink uiMemory Segments 
Available

Re: adding core-site xml to flink1.11

2020-10-27 Thread Robert Metzger
Hi,

it seems that this is what you have to do for now. However, I see that it
would be nice if Flink would allow reading from multiple configuration
files, so that you can have a "common configuration" and a "per cluster"
configuration.

I filed a JIRA ticket for a feature request:
https://issues.apache.org/jira/browse/FLINK-19828


On Tue, Oct 27, 2020 at 10:54 AM Shachar Carmeli 
wrote:

> Hi,
> Thank you for your reply,
> WE are deploying on kubernetes and the xml is part of the  common config
> map to all flink jobs we have(or at least was for previous versions)
>
> This means that we need to duplicate the configuration in the
> flink-conf.yaml for each job
> instead of having a common configmap
>
> Thanks,
> Shachar
>
> On 2020/10/27 08:48:17, Robert Metzger  wrote:
> > Hi Shachar,
> >
> > Why do you want to use the core-site.xml to configure the file system?
> >
> > Since we are adding the file systems as plugins, their initialization is
> > customized. It might be the case that we are intentionally ignoring xml
> > configurations from the classpath.
> > You can configure the filesystem in the flink-conf.yaml file.
> >
> >
> > On Sun, Oct 25, 2020 at 7:56 AM Shachar Carmeli 
> > wrote:
> >
> > > Hi,
> > > I'm trying to define filesystem to flink 1.11 using core-site.xml
> > > I tried adding in the flink-conf.yaml env.hadoop.conf.dir and I see it
> is
> > > added to the classpath
> > > also adding environment variable HADOOP_CONF_DIR didn't help
> > >
> > > The flink 1.11.2 is running on docker using kubernetes
> > >
> > > I added hadoop using plugin as mentioned in
> > >
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html#hadooppresto-s3-file-systems-plugins
> > >
> > > when configure the parameters manually I can connect to the local s3a
> > > server
> > > So it looks like the flink is not reading the core-site.xml file
> > >
> > > please advise
> > >
> > > Thanks,
> > > Shachar
> > >
> >
>


Native memory allocation (mmap) failed to map 1006567424 bytes

2020-10-27 Thread Ori Popowski
After the job is running for 10 days in production, TaskManagers start
failing with:

Connection unexpectedly closed by remote task manager

Looking in the machine logs, I can see the following error:

= Java processes for user hadoop =
OpenJDK 64-Bit Server VM warning: INFO:
os::commit_memory(0x7fb4f401, 1006567424, 0) failed; error='Cannot
allocate memory' (err
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 1006567424 bytes for
committing reserved memory.
# An error report file with more information is saved as:
# /mnt/tmp/hsperfdata_hadoop/hs_err_pid6585.log
=== End java processes for user hadoop ===

In addition, the metrics for the TaskManager show very low Heap memory
consumption (20% of Xmx).

Hence, I suspect there is a memory leak in the TaskManager's Managed Memory.

This my TaskManager's memory detail:
flink process 112g
framework.heap.size 0.2g
task.heap.size 50g
managed.size 54g
framework.off-heap.size 0.5g
task.off-heap.size 1g
network 2g
XX:MaxMetaspaceSize 1g

As you can see, the managed memory is 54g, so it's already high (my
managed.fraction is set to 0.5).

I'm running Flink 1.10. Full job details attached.

Can someone advise what would cause a managed memory leak?

Starting YARN TaskExecutor runner (Version: 1.10.0, Rev:, 
Date:)
OS current user: yarn
Current Hadoop/Kerberos user: hadoop
JVM: OpenJDK 64-Bit Server VM - Amazon.com Inc. - 1.8/25.252-b09
Maximum heap size: 52224 MiBytes
JAVA_HOME: /etc/alternatives/jre
Hadoop version: 2.8.5-amzn-6
JVM Options:
   -Xmx54760833024
   -Xms54760833024
   -XX:MaxDirectMemorySize=3758096384
   -XX:MaxMetaspaceSize=1073741824
   -XX:+UseG1GC
   
-Dlog.file=/var/log/hadoop-yarn/containers/application_1600334141629_0011/container_1600334141629_0011_01_02/taskmanager.log
   -Dlog4j.configuration=file:./log4j.properties
Program Arguments:
   -D taskmanager.memory.framework.off-heap.size=536870912b
   -D taskmanager.memory.network.max=2147483648b
   -D taskmanager.memory.network.min=2147483648b
   -D taskmanager.memory.framework.heap.size=134217728b
   -D taskmanager.memory.managed.size=58518929408b
   -D taskmanager.cpu.cores=7.0
   -D taskmanager.memory.task.heap.size=54626615296b
   -D taskmanager.memory.task.off-heap.size=1073741824b
   --configDir .
   -Djobmanager.rpc.address=ip-***.us-west-2.compute.internal
   -Dweb.port=0
   -Dweb.tmpdir=/tmp/flink-web-ad601f25-685f-42e5-aa93-9658233031e4
   -Djobmanager.rpc.port=35435
   -Drest.address=ip-***.us-west-2.compute.internal


Re: ValidationException using DataTypeHint in Scalar Function

2020-10-27 Thread Dawid Wysakowicz
Hey Steve,

You should be able to do via the bridgedTo parameter. You can
additionally specify a serializer you want to use via rawSerializer
parameter:

        @FunctionHint(
                input = {
                        @DataTypeHint(value = "RAW", bridgedTo =
Map.class[, rawSerializer = ... ]),
                        @DataTypeHint("STRING")},
                output = @DataTypeHint("STRING")
        )
        public static String eval(final Object map, final String key)

Best,

Dawid

On 26/10/2020 16:10, Steve Whelan wrote:
> Hi,
>
> I have a column of type *RAW('java.util.Map', ?)* that I want to pass
> to a scalar function UDF. I'm using DataTypeHints but hitting an
> exception. What would be the proper DataTypeHint and data type param
> to achieve this?
>
>   @FunctionHint(
>           input = {@DataTypeHint("RAW"), @DataTypeHint("STRING")},
>           output = @DataTypeHint("STRING")
>   )
>   public static String eval(final Object map, final String key) {
>     // business logic
>   }
>
>
> *Exception:*
> *
> *
> Caused by: org.apache.flink.table.api.ValidationException: Invalid
> input arguments. Expected signatures are:
> MAP_VALUE(RAW('java.lang.Object', '...'), STRING)
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:190)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:131)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
> ... 50 more
> Caused by: org.apache.flink.table.api.ValidationException: Invalid
> argument type at position 0. Data type RAW('java.lang.Object', '...')
> expected but RAW('java.util.Map', ?) passed.
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:137)
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:102)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:126)
> ... 51 more*
> *
>
>
> Thank you,
>
> Steve


signature.asc
Description: OpenPGP digital signature


Re: RestClusterClient and classpath

2020-10-27 Thread Flavio Pompermaier
In the logs I see that the jar is the classpath (I'm trying to debug the
program from the IDE)..isn'it?

Classpath: [file:/tmp/job-bundle.jar]
...

Best,
Flavio

On Tue, Oct 27, 2020 at 10:39 AM Chesnay Schepler 
wrote:

> * your JobExecutor is _not_ putting it on the classpath.
>
> On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
>
> Well it happens on the client before you even hit the RestClusterClient,
> so I assume that either your jar is not packaged correctly or you your
> JobExecutor is putting it on the classpath.
>
> On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
>
> Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class
> I'm trying to use as a client towards the Flink cluster - session mode).
> it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).
>
> The code of getBatchEnv is:
>
> @Deprecated
>   public static BatchEnv getBatchEnv() {
> // TODO use the following when ready to convert from/to datastream
> // return
> getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
> ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> BatchTableEnvironment ret = BatchTableEnvironment.create(env);
> customizeEnv(ret);
> return new BatchEnv(env, ret);
>   }
>
>   private static void customizeEnv(TableEnvironment ret) {
> final Configuration conf = ret.getConfig().getConfiguration();
> //
> conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
> 2);
> conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
> conf.setString(BlobServerOptions.STORAGE_DIRECTORY,
> FLINK_TEST_TMP_DIR);
> // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4);
> //NOSONAR
> // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
> 0.4f);//NOSONAR
> // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 *
> 2);//NOSONAR
> // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 *
> 2);// NOSONAR
> conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);//
> NOSONAR
> conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
> conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
> conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
> conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));//
> NOSONAR
> final List kryoSerializers = new ArrayList<>();
> kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class,
> JodaDateTimeSerializer.class));
> kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class,
> TBaseSerializer.class));
> kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class,
> TBaseSerializer.class));
> conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);
>
>   }
>
> Classpath: [file:/tmp/job-bundle.jar]
>
> System.out: (none)
>
> System.err: (none)
> at
> org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
> at
> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
> at
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
> at
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
> at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
> Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
> at
> it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
> at
> it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
> at
> it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> at
> org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
> ... 3 more
> Caused by: java.lang.ClassNotFoundException: it/test/MyOb
> at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
> at
> org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
> at
> org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
> at
> 

Re: flink state.savepoints.dir 目录配置问题

2020-10-27 Thread Congxian Qiu
Hi
   这个你可以尝试把这个信息记录到哪里,或者在启动的时候从这个 jobId 的目录下去查找所有的 chk-xxx 然后选择一个合适的 目录进行恢复
Best,
Congxian


marble.zh...@coinflex.com.INVALID 
于2020年10月27日周二 下午4:54写道:

> 刚钉钉群里建议我把路径指到jobId/chk-xx目录,这样就可以恢复了。
>
> 但是如果这样,这个xx随着checkpoint的变化而变化,这样怎么做到自动提交job?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re:无法从checkpoint中恢复state

2020-10-27 Thread Congxian Qiu
Hi
   从报错看,你知道的是一个目录,这个目录下面没有 _metadata 文件,这不是一个完整的 checkpoint/savepoint
因此不能用于恢复
Best,
Congxian


marble.zh...@coinflex.com.INVALID 
于2020年10月27日周二 下午4:06写道:

> /opt/flink/bin/flink run -d -s /opt/flink/savepoints -c
> com.xxx.flink.ohlc.kafka.OrderTickCandleView
> /home/service-ohlc-*-SNAPSHOT.jar
>
> 在启动job时,已经指定这个目录,但会报以下错,
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
> instantiate JobManager.
> at
>
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
> at
>
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
> ... 6 more
> Caused by: java.io.FileNotFoundException: Cannot find meta data file
> '_metadata' in directory '/opt/flink/savepoints'. Please try to load the
> checkpoint/savepoint directly from the metadata file instead of the
> directory.
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Heartbeat of TaskManager with id xxx timed out

2020-10-27 Thread Xintong Song
TM 心跳超时有以下几种常见的原因:
1. 网络抖动
2. TM 丢失,进程挂掉了、被杀了之类的
3. JM 或 TM 由于 GC 等原因,未能及时响应处理心跳

建议排查下对应 TM 的日志,以及 JM/TM 的 GC 日志。

Thank you~

Xintong Song



On Tue, Oct 27, 2020 at 1:46 PM freeza1...@outlook.com <
freeza1...@outlook.com> wrote:

> Hi all:
> flink standalone模式, 3节点,1master,3slave, 创建了1个job运行到一定时间,有将近5天,就会报错
> Heartbeat of TaskManager with id a2d4661d77371163f4c2bad51024df9e timed
> out,然后有1个节点的taskmanager就用不了了。请问这个是什么原因?如何排查?
>
> 异常信息如下:
> java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id
> a2d4661d77371163f4c2bad51024df9e timed out.
> at
> org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1147)
> at
> org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:109)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2020-10-23 15:40:23,197 INFO
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy
> - Calculating tasks to restart to recover the failed task
> c59b7af815032e363d2ad399c0896984_3.
> 2020-10-23 15:40:23,197 INFO
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionStrategy
> - 8 tasks should be restarted to recover the failed task
> c59b7af815032e363d2ad399c0896984_3.
>
>
>
> freeza1...@outlook.com
>


Re: RestClusterClient and classpath

2020-10-27 Thread Chesnay Schepler

* your JobExecutor is _not_ putting it on the classpath.

On 10/27/2020 10:36 AM, Chesnay Schepler wrote:
Well it happens on the client before you even hit the 
RestClusterClient, so I assume that either your jar is not packaged 
correctly or you your JobExecutor is putting it on the classpath.


On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main 
class I'm trying to use as a client towards the Flink cluster - 
session mode).

it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).

The code of getBatchEnv is:

@Deprecated
  public static BatchEnv getBatchEnv() {
    // TODO use the following when ready to convert from/to datastream
    // return 
getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
    ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

    BatchTableEnvironment ret = BatchTableEnvironment.create(env);
    customizeEnv(ret);
    return new BatchEnv(env, ret);
  }

  private static void customizeEnv(TableEnvironment ret) {
    final Configuration conf = ret.getConfig().getConfiguration();
    // 
conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 
2);

    conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
    conf.setString(BlobServerOptions.STORAGE_DIRECTORY, 
FLINK_TEST_TMP_DIR);
    // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); 
//NOSONAR
    // 
conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 
0.4f);//NOSONAR
    // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 
32768 * 2);//NOSONAR
    // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 
32768 * 2);// NOSONAR

conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// NOSONAR
    conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
    conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
    conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
    conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// 
NOSONAR

    final List kryoSerializers = new ArrayList<>();
kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, 
JodaDateTimeSerializer.class));
kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, 
TBaseSerializer.class));
kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, 
TBaseSerializer.class));

    conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);

  }

Classpath: [file:/tmp/job-bundle.jar]

System.out: (none)

System.err: (none)
at 
org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
at 
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)

at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
at 
it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
at 
it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
at 
it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at 
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)

... 3 more
Caused by: java.lang.ClassNotFoundException: it/test/MyOb
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)

at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 13 more

On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger > wrote:


Hi Flavio,
can you share the full stacktrace you are seeing? I'm wondering
if the error happens on the client or server side (among other
questions I have).

On Mon, Oct 26, 2020 at 5:58 PM 

Re: RestClusterClient and classpath

2020-10-27 Thread Chesnay Schepler
Well it happens on the client before you even hit the RestClusterClient, 
so I assume that either your jar is not packaged correctly or you your 
JobExecutor is putting it on the classpath.


On 10/27/2020 9:42 AM, Flavio Pompermaier wrote:
Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main 
class I'm trying to use as a client towards the Flink cluster - 
session mode).

it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).

The code of getBatchEnv is:

@Deprecated
  public static BatchEnv getBatchEnv() {
    // TODO use the following when ready to convert from/to datastream
    // return 
getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
    ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

    BatchTableEnvironment ret = BatchTableEnvironment.create(env);
    customizeEnv(ret);
    return new BatchEnv(env, ret);
  }

  private static void customizeEnv(TableEnvironment ret) {
    final Configuration conf = ret.getConfig().getConfiguration();
    // 
conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 
2);

    conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
    conf.setString(BlobServerOptions.STORAGE_DIRECTORY, 
FLINK_TEST_TMP_DIR);
    // conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4); 
//NOSONAR
    // conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 
0.4f);//NOSONAR
    // conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 
32768 * 2);//NOSONAR
    // conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 
32768 * 2);// NOSONAR

conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// NOSONAR
    conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
    conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
    conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
    conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));// 
NOSONAR

    final List kryoSerializers = new ArrayList<>();
kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class, 
JodaDateTimeSerializer.class));
kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class, 
TBaseSerializer.class));
kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class, 
TBaseSerializer.class));

    conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);

  }

Classpath: [file:/tmp/job-bundle.jar]

System.out: (none)

System.err: (none)
at 
org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
at 
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)

at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
at 
it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
at 
it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
at 
it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at 
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)

... 3 more
Caused by: java.lang.ClassNotFoundException: it/test/MyOb
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at 
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at 
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)

at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 13 more

On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger > wrote:


Hi Flavio,
can you share the full stacktrace you are seeing? I'm wondering if
the error happens on the client or server side (among other
questions I have).

On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier
mailto:pomperma...@okkam.it>> wrote:

Hi to all,
I was trying to use 

Re: FLINK 1.11 Graphite Metrics

2020-10-27 Thread Chesnay Schepler
Are you writing a test? (otherwise the ReporterSetupTest reporters 
wouldn't be around)

Do you have a dependency on the graphite reporter?

On 10/27/2020 8:27 AM, Robert Metzger wrote:

Hi Vijayendra,
can you post or upload the entire logs, so that we can see the 
Classpath logged on startup, as well as the effective configuration 
parameters?


On Tue, Oct 27, 2020 at 12:49 AM Vijayendra Yadav 
mailto:contact@gmail.com>> wrote:


Hi Chesnay,

Another log message:

2020-10-26 23:33:08,516 WARN
org.apache.flink.runtime.metrics.ReporterSetup - The reporter
factory
(org.apache.flink.metrics.graphite.GraphiteReporterFactory) could
not be found for reporter grph. Available factories:

[org.apache.flink.runtime.metrics.ReporterSetupTest$ConfigExposingReporterFactory,
org.apache.flink.runtime.metrics.ReporterSetupTest$TestReporterFactory,

org.apache.flink.runtime.metrics.ReporterSetupTest$InstantiationTypeTrackingTestReporterFactory,
org.apache.flink.runtime.metrics.ReporterSetupTest$FailingFactory].
2020-10-26 23:33:08,517 INFO
org.apache.flink.runtime.metrics.MetricRegistryImpl - No metrics
reporter configured, no metrics will be exposed/reported.
Regards,
Vijay

On Mon, Oct 26, 2020 at 2:34 PM Vijayendra Yadav
mailto:contact@gmail.com>> wrote:

Hi Chesnay,

I have the same, and I am exporting the flinkconf like below,
where i have flink-conf.yaml with configuration you have
given.What else can I try ?

export FLINK_CONF_DIR=${app_install_path}/flinkconf/

regards,
Vijay

On Sun, Oct 25, 2020 at 8:03 AM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

Ah wait, in 1.11 it should not longer be necessary to
explicitly copy the reporter jar.

Please update your reporter configuration to this:

|metrics.reporter.grph.factory.class:
org.apache.flink.metrics.graphite.GraphiteReporterFactory|

On 10/25/2020 4:00 PM, Chesnay Schepler wrote:

Have you followed the documentation, specifically this bit?

> In order to use this reporter you must copy
|/opt/flink-metrics-influxdb-1.11.2.jar| into the
|plugins/influxdb| folder of your Flink distribution.

On 10/24/2020 12:17 AM, Vijayendra Yadav wrote:

Hi Team,

for Flink 1.11 Graphite Metrics. I see the following
Error in the log.
Any suggestions?

020-10-23 21:55:14,652 ERROR 
org.apache.flink.runtime.metrics.ReporterSetup- Could not 
instantiate metrics reporter grph. Metrics might not be exposed/reported.
java.lang.ClassNotFoundException: 
org.apache.flink.metrics.graphite.GraphiteReporter
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:264)
at 
org.apache.flink.runtime.metrics.ReporterSetup.loadViaReflection(ReporterSetup.java:313)
at 
org.apache.flink.runtime.metrics.ReporterSetup.loadReporter(ReporterSetup.java:274)
at 
org.apache.flink.runtime.metrics.ReporterSetup.setupReporters(ReporterSetup.java:235)
at 
org.apache.flink.runtime.metrics.ReporterSetup.fromConfiguration(ReporterSetup.java:148)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:316)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:270)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:208)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:517)
at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:89)

Regards,
Vijay









Re: 关于flink-sql 维表join问题

2020-10-27 Thread Jark Wu
我觉得这个更像是一个周期性调度的批处理需求。因为你流处理,只能一直读取员工表的增量,没法每天读个全量。
是不是用 flink batch + 调度更好一点呢?

Best,
Jark

On Tue, 27 Oct 2020 at 16:08, 夜思流年梦  wrote:

> 目前在准备搞实时数仓:碰到一个问题:
> 比如统计一个所有员工所有的业绩的报表,这个报表需要关联1个员工维表,4个业绩相关流表;
> 如果是正常SQL的话是这样join :
>
>
> 维表 left join  流表  1
> left join 流表 2
> left join 流表 3
> left join 流表 4
>
>
> 因为flink-sql 的temporal join 不支持 维表在左边 left join 流表,
>
>
> 故只能 流表在左,维表在右来join
> 即:select  * from  table a  left join dim_XXX  FOR SYSTEM_TIME AS OF
> a.proctime as c on a.memberId=c.rowkey
>
>
> 但是这个存在的问题是那些今天没有业绩的员工就没有统计数据,如果只是join一张流表,那么我可以把没有数据的员工在出报表时补空数据,现在的情况是要join4
> 张流表,那么必须得四张流表都有数据的员工才会有数据,这个就是问题了:最终的报表只有4个流表都有数据的员工。
>
>
> 上次问过一次,上次回答的是双流join,双流join的问题也是一样,只有两者都有数据才会得到最终结果,更何况是员工维表,基本上变化很少。因为有点找不到上次那个邮件了,所以再问一下,这种场景(维表在左
> left join 流表)有没有比较好的解决方案
>
>
>
>
>
>
>
>


Re: flink state.savepoints.dir 目录配置问题

2020-10-27 Thread marble.zh...@coinflex.com.INVALID
刚钉钉群里建议我把路径指到jobId/chk-xx目录,这样就可以恢复了。

但是如果这样,这个xx随着checkpoint的变化而变化,这样怎么做到自动提交job?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: adding core-site xml to flink1.11

2020-10-27 Thread Robert Metzger
Hi Shachar,

Why do you want to use the core-site.xml to configure the file system?

Since we are adding the file systems as plugins, their initialization is
customized. It might be the case that we are intentionally ignoring xml
configurations from the classpath.
You can configure the filesystem in the flink-conf.yaml file.


On Sun, Oct 25, 2020 at 7:56 AM Shachar Carmeli 
wrote:

> Hi,
> I'm trying to define filesystem to flink 1.11 using core-site.xml
> I tried adding in the flink-conf.yaml env.hadoop.conf.dir and I see it is
> added to the classpath
> also adding environment variable HADOOP_CONF_DIR didn't help
>
> The flink 1.11.2 is running on docker using kubernetes
>
> I added hadoop using plugin as mentioned in
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/s3.html#hadooppresto-s3-file-systems-plugins
>
> when configure the parameters manually I can connect to the local s3a
> server
> So it looks like the flink is not reading the core-site.xml file
>
> please advise
>
> Thanks,
> Shachar
>


Re: RestClusterClient and classpath

2020-10-27 Thread Flavio Pompermaier
Sure. Here it is (org.apache.flink.client.cli.JobExecutor is my main class
I'm trying to use as a client towards the Flink cluster - session mode).
it/test/MyOb is within the fat jar (/tmp/job-bundle.jar).

The code of getBatchEnv is:

@Deprecated
  public static BatchEnv getBatchEnv() {
// TODO use the following when ready to convert from/to datastream
// return
getTableEnv(EnvironmentSettings.newInstance().inBatchMode().build());
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment ret = BatchTableEnvironment.create(env);
customizeEnv(ret);
return new BatchEnv(env, ret);
  }

  private static void customizeEnv(TableEnvironment ret) {
final Configuration conf = ret.getConfig().getConfiguration();
//
conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
2);
conf.setString(CoreOptions.TMP_DIRS, FLINK_TEST_TMP_DIR);
conf.setString(BlobServerOptions.STORAGE_DIRECTORY, FLINK_TEST_TMP_DIR);
// conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 4);
//NOSONAR
// conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY,
0.4f);//NOSONAR
// conf.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 32768 *
2);//NOSONAR
// conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 32768 *
2);// NOSONAR
conf.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 0);// NOSONAR
conf.setString(AkkaOptions.ASK_TIMEOUT, "10 min");// NOSONAR
conf.setString(AkkaOptions.TCP_TIMEOUT, "10 min");// NOSONAR
conf.setString(AkkaOptions.STARTUP_TIMEOUT, "10 min");// NOSONAR
conf.set(ClientOptions.CLIENT_TIMEOUT, Duration.ofMinutes(10));//
NOSONAR
final List kryoSerializers = new ArrayList<>();
kryoSerializers.add(getKryoSerializerConfigLine(DateTime.class,
JodaDateTimeSerializer.class));
kryoSerializers.add(getKryoSerializerConfigLine(EntitonAtom.class,
TBaseSerializer.class));
kryoSerializers.add(getKryoSerializerConfigLine(EntitonQuad.class,
TBaseSerializer.class));
conf.set(PipelineOptions.KRYO_DEFAULT_SERIALIZERS, kryoSerializers);

  }

Classpath: [file:/tmp/job-bundle.jar]

System.out: (none)

System.err: (none)
at
org.apache.flink.client.program.PackagedProgramUtils.generateException(PackagedProgramUtils.java:245)
at
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:164)
at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:77)
at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:109)
at org.apache.flink.client.cli.JobExecutor.main(JobExecutor.java:42)
Caused by: java.lang.NoClassDefFoundError: it/test/MyOb
at
it.okkam.datalinks.flink.DatalinksExecutionEnvironment.customizeEnv(DatalinksExecutionEnvironment.java:116)
at
it.okkam.datalinks.flink.DatalinksExecutionEnvironment.getBatchEnv(DatalinksExecutionEnvironment.java:95)
at
it.okkam.datalinks.flink.jobs.EnsReconciliator.main(EnsReconciliator.java:73)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:150)
... 3 more
Caused by: java.lang.ClassNotFoundException: it/test/MyOb
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
at
org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 13 more

On Tue, Oct 27, 2020 at 9:32 AM Robert Metzger  wrote:

> Hi Flavio,
> can you share the full stacktrace you are seeing? I'm wondering if the
> error happens on the client or server side (among other questions I have).
>
> On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier 
> wrote:
>
>> Hi to all,
>> I was trying to use the RestClusterClient to submit my job to the Flink
>> cluster.
>> However when I submit the job Flink cannot find the classes contained in
>> the "fat" jar..what should I do? Am I missing something in my code?
>> This is the current client code I'm testing:
>>
>> public static void main(String[] args) throws MalformedURLException {
>> final 

Re: HA on AWS EMR

2020-10-27 Thread Averell
Hello Robert,

Thanks for the info. That makes sense. I will save and cancel my jobs with
1.10, upgrade to 1.11, and restore the jobs from the savepoints.

Thanks and regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: RestClusterClient and classpath

2020-10-27 Thread Robert Metzger
Hi Flavio,
can you share the full stacktrace you are seeing? I'm wondering if the
error happens on the client or server side (among other questions I have).

On Mon, Oct 26, 2020 at 5:58 PM Flavio Pompermaier 
wrote:

> Hi to all,
> I was trying to use the RestClusterClient to submit my job to the Flink
> cluster.
> However when I submit the job Flink cannot find the classes contained in
> the "fat" jar..what should I do? Am I missing something in my code?
> This is the current client code I'm testing:
>
> public static void main(String[] args) throws MalformedURLException {
> final Configuration flinkConf = new Configuration();
> flinkConf.set(RestOptions.ADDRESS, "localhost");
> flinkConf.set(RestOptions.PORT, 8081);
>
> final File jarFile = new File("/tmp/job-bundle.jar");
> final String jobClass = "it.flink.MyJob";
>
> try {
>   final RestClusterClient client =
>   new RestClusterClient<>(flinkConf,
> StandaloneClusterId.getInstance());
>
>   final PackagedProgram packagedProgram =
> PackagedProgram.newBuilder()//
>   .setJarFile(jarFile)//
>   // .setUserClassPaths(userClassPaths)
>   .setEntryPointClassName(jobClass).setConfiguration(flinkConf)//
>   .build();
>
>   final JobGraph jobGraph =
>   PackagedProgramUtils.createJobGraph(packagedProgram, flinkConf,
> 1, true);
>
>   final DetachedJobExecutionResult jobExecutionResult =
>
> client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get();
>
>   System.out.println(jobExecutionResult.getJobID());
> } catch (Exception ex) {
>   ex.printStackTrace();
>   System.exit(1);
> }
> }
>
> Best,
> Flavio
>


Re: [BULK]Re: Support of composite data types in flink-parquet

2020-10-27 Thread Jon Alberdi
Indeed, thanks Andrey!

From: Andrey Zagrebin 
Date: Tuesday, October 20, 2020 at 6:03 PM
To: Jon Alberdi 
Cc: user@flink.apache.org 
Subject: [BULK]Re: Support of composite data types in flink-parquet
Hi Jon,

I have found this ticket [1]. It looks like what you are looking for.

Best,
Andrey

[1] 
https://issues.apache.org/jira/browse/FLINK-17782

On Tue, Oct 20, 2020 at 4:50 PM Jon Alberdi 
mailto:j.albe...@criteo.com>> wrote:
Hello, as stated at 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/connectors/formats/parquet.html
“Attention: Composite data type: Array, Map and Row are not supported”.

I could not find a JIRA related to that issue, is there any?
If not, could I create one to continue the discussion there?

Regards



关于flink-sql 维表join问题

2020-10-27 Thread 夜思流年梦
目前在准备搞实时数仓:碰到一个问题:
比如统计一个所有员工所有的业绩的报表,这个报表需要关联1个员工维表,4个业绩相关流表; 
如果是正常SQL的话是这样join :


维表 left join  流表  1
left join 流表 2
left join 流表 3
left join 流表 4


因为flink-sql 的temporal join 不支持 维表在左边 left join 流表,


故只能 流表在左,维表在右来join  
即:select  * from  table a  left join dim_XXX  FOR SYSTEM_TIME AS OF a.proctime 
as c on a.memberId=c.rowkey 


但是这个存在的问题是那些今天没有业绩的员工就没有统计数据,如果只是join一张流表,那么我可以把没有数据的员工在出报表时补空数据,现在的情况是要join4 
张流表,那么必须得四张流表都有数据的员工才会有数据,这个就是问题了:最终的报表只有4个流表都有数据的员工。


上次问过一次,上次回答的是双流join,双流join的问题也是一样,只有两者都有数据才会得到最终结果,更何况是员工维表,基本上变化很少。因为有点找不到上次那个邮件了,所以再问一下,这种场景(维表在左
 left join 流表)有没有比较好的解决方案









Re: Re:无法从checkpoint中恢复state

2020-10-27 Thread marble.zh...@coinflex.com.INVALID
/opt/flink/bin/flink run -d -s /opt/flink/savepoints -c
com.xxx.flink.ohlc.kafka.OrderTickCandleView
/home/service-ohlc-*-SNAPSHOT.jar

在启动job时,已经指定这个目录,但会报以下错,
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
instantiate JobManager.
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
... 6 more
Caused by: java.io.FileNotFoundException: Cannot find meta data file
'_metadata' in directory '/opt/flink/savepoints'. Please try to load the
checkpoint/savepoint directly from the metadata file instead of the
directory.




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink state.savepoints.dir 目录配置问题

2020-10-27 Thread marble.zh...@coinflex.com.INVALID
谢谢, 我把这个folder 设置为一个755就可以了。

但现在我遇到一个问题,我目前的环境是用docker 创建了一个jobmanager, 二个taskmanager,
这三个container都map到了主机上的一个地址,
用于放checkpoints/savepoints,理论上这三个container都可以访问得到。

但尝试用这个命令恢复state启动job时报以下错误,
/opt/flink/bin/flink run -d -s /opt/flink/savepoints -c
com.coinflex.flink.ohlc.kafka.OrderTickCandleView
/home/service-ohlc-*-SNAPSHOT.jar

job启动不了。


Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
instantiate JobManager.
at
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:398)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
... 6 more
Caused by: java.io.FileNotFoundException: Cannot find meta data file
'_metadata' in directory '/opt/flink/savepoints'. Please try to load the
checkpoint/savepoint directly from the metadata file instead of the
directory.



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: How to understand NOW() in SQL when using Table & SQL API to develop a streaming app?

2020-10-27 Thread Till Rohrmann
Quick question Jark: Is this difference in behaviour documented? I couldn't
find it in the docs.

Cheers,
Till

On Tue, Oct 27, 2020 at 7:30 AM Jark Wu  wrote:

> Hi Longdexin,
>
> In traditional batch sql, NOW() is executed and determined before the job
> is submitted and will not change for every processed record.
> However, this doesn't make much sense in streaming sql, therefore, NOW()
> function in Flink is executed for every record.
>
> Best,
> Jark
>
> On Fri, 23 Oct 2020 at 16:30, Till Rohrmann  wrote:
>
>> Hi Longdexin,
>>
>> thanks for reaching out to the Flink community. I am pulling in Jark who
>> might be able to help you with this question.
>>
>> Cheers,
>> Till
>>
>> On Thu, Oct 22, 2020 at 2:56 PM Longdexin <274522...@qq.com> wrote:
>>
>>> From my point of view, the value of NOW() function in SQL is certain by
>>> the
>>> time when the streaming app is launched and will not change with the
>>> process
>>> time. However, as a new Flink user, I'm not so sure of that. By the way,
>>> if
>>> my attemp is to keep the time logic to update all the time, what should I
>>> do?
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>


Re: EMR Logging Woes

2020-10-27 Thread Robert Metzger
Hi Rex,

1. You can also use the Flink UI for retrieving logs. That usually works
quite fast (unless your logs are huge).

2. These are the correct configuration files for setting the log level. Are
you running on a vanilla EMR cluster, or are there modifications? The
"problem" is that Flink on YARN adds jar files (and other files) provided
by the environment (YARN) to its classpath. The vanilla EMR configuration
should be fine to not interfere with Flink's logging. But maybe there are
some changes in your environment that cause problems?

Since you are SSHing into the machines already: At the top of each Flink
log file, we are logging the location of the log4j configuration file
(search for "-Dlog4j.configuration="). Try to open that file to verify
what's in there.

Hope this helps!

Robert


On Tue, Oct 27, 2020 at 12:03 AM Rex Fenley  wrote:

> Hello,
>
> After lots of testing in local environments we're now trying to get our
> cluster running on AWS EMR. We followed much of the documentation from both
> AWS and Flink and have gotten to the point of creating a yarn session and
> submitting jobs. We successfully get back a Job ID and in the Yarn Timeline
> Server UI it says our application is running. However, we are having a hard
> time with logging.
>
> 2 main issues:
> 1. Logs for the jobmanager and taskmanager seem to take a long time to
> show up or in some cases just seem to never show up in the Yarn / Hadoop
> UI, even though we can see them just fine when ssh'ing into the cluster's
> nodes. Anything we can do to speed this up?
>
> 2. We can't seem to see anything except for WARN and ERROR logs for the
> jobmanager and taskmanager, we need at least INFO right now to confirm
> things are working as expected. We have been jumping through hoops going
> through a multitude of configuration files including
> log4j-session.properties and log4j.properties setting level to DEBUG but
> it has not helped. Are these the correct configuration files?
>
> Thanks!
>
> --
>
> Rex Fenley  |  Software Engineer - Mobile and Backend
>
>
> Remind.com  |  BLOG   |
>  FOLLOW US   |  LIKE US
> 
>


Working with bounded Datastreams - Flink 1.11.1

2020-10-27 Thread s_penakalap...@yahoo.com
Hi Team,
I want to use Flink Datastream for Batch operations which involves huge data, I 
did try to calculate count and average on the whole Datastream with out using 
window function.
 Approach I tried to calculate count on the datastream:1> Read data from table 
(say past 2 days of data) as Datastream2> apply Key operation on the 
datastream3> then use reduce function to find count, sum and average.
I have written output to file and also inserted into table: sample data from 
file is:
vehicleId=aa, count=1, fuel=10, avgFuel=0.0vehicleId=dd, count=1, fuel=7, 
avgFuel=0.0
vehicleId=dd, count=2, fuel=22, avgFuel=11.0vehicleId=dd, count=3, fuel=42, 
avgFuel=14.0vehicleId=ee, count=1, fuel=0, avgFuel=0.0
what I am looking for is , when there are multiple records with same vehicle Id 
I see that only the final record is having correct values (like vehicleId=dd). 
Is there any way to get only one final record for each vehicle as shown 
below:vehicleId=aa, count=1, fuel=10, avgFuel=0.0vehicleId=dd, count=3, 
fuel=42, avgFuel=14.0
vehicleId=ee, count=1, fuel=0, avgFuel=0.0
Also I request some help on how to sort whole DataStream based on one 
attribute. Say we have x records in one Batch Job I would like to sort and 
fetch X-2 position record per vehicle.
Regards,Sunitha.


Re: FLINK 1.11 Graphite Metrics

2020-10-27 Thread Robert Metzger
Hi Vijayendra,
can you post or upload the entire logs, so that we can see the Classpath
logged on startup, as well as the effective configuration parameters?

On Tue, Oct 27, 2020 at 12:49 AM Vijayendra Yadav 
wrote:

> Hi Chesnay,
>
> Another log message:
>
> 2020-10-26 23:33:08,516 WARN
> org.apache.flink.runtime.metrics.ReporterSetup - The reporter factory
> (org.apache.flink.metrics.graphite.GraphiteReporterFactory) could not be
> found for reporter grph. Available factories:
> [org.apache.flink.runtime.metrics.ReporterSetupTest$ConfigExposingReporterFactory,
> org.apache.flink.runtime.metrics.ReporterSetupTest$TestReporterFactory,
> org.apache.flink.runtime.metrics.ReporterSetupTest$InstantiationTypeTrackingTestReporterFactory,
> org.apache.flink.runtime.metrics.ReporterSetupTest$FailingFactory].
> 2020-10-26 23:33:08,517 INFO
> org.apache.flink.runtime.metrics.MetricRegistryImpl - No metrics reporter
> configured, no metrics will be exposed/reported.
>
> Regards,
> Vijay
>
> On Mon, Oct 26, 2020 at 2:34 PM Vijayendra Yadav 
> wrote:
>
>> Hi Chesnay,
>>
>> I have the same, and I am exporting the flinkconf like below, where i
>> have flink-conf.yaml with configuration you have given.What else can I try ?
>>
>> export FLINK_CONF_DIR=${app_install_path}/flinkconf/
>>
>> regards,
>> Vijay
>>
>> On Sun, Oct 25, 2020 at 8:03 AM Chesnay Schepler 
>> wrote:
>>
>>> Ah wait, in 1.11 it should not longer be necessary to explicitly copy
>>> the reporter jar.
>>>
>>> Please update your reporter configuration to this:
>>>
>>> metrics.reporter.grph.factory.class: 
>>> org.apache.flink.metrics.graphite.GraphiteReporterFactory
>>>
>>> On 10/25/2020 4:00 PM, Chesnay Schepler wrote:
>>>
>>> Have you followed the documentation, specifically this bit?
>>>
>>> > In order to use this reporter you must copy
>>> /opt/flink-metrics-influxdb-1.11.2.jar into the plugins/influxdb folder
>>> of your Flink distribution.
>>>
>>> On 10/24/2020 12:17 AM, Vijayendra Yadav wrote:
>>>
>>> Hi Team,
>>>
>>> for Flink 1.11 Graphite Metrics. I see the following Error in the log.
>>> Any suggestions?
>>>
>>> 020-10-23 21:55:14,652 ERROR org.apache.flink.runtime.metrics.ReporterSetup 
>>>- Could not instantiate metrics reporter grph. Metrics might 
>>> not be exposed/reported.
>>> java.lang.ClassNotFoundException: 
>>> org.apache.flink.metrics.graphite.GraphiteReporter
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
>>> at java.lang.Class.forName0(Native Method)
>>> at java.lang.Class.forName(Class.java:264)
>>> at 
>>> org.apache.flink.runtime.metrics.ReporterSetup.loadViaReflection(ReporterSetup.java:313)
>>> at 
>>> org.apache.flink.runtime.metrics.ReporterSetup.loadReporter(ReporterSetup.java:274)
>>> at 
>>> org.apache.flink.runtime.metrics.ReporterSetup.setupReporters(ReporterSetup.java:235)
>>> at 
>>> org.apache.flink.runtime.metrics.ReporterSetup.fromConfiguration(ReporterSetup.java:148)
>>> at 
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createMetricRegistry(ClusterEntrypoint.java:316)
>>> at 
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:270)
>>> at 
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:208)
>>> at 
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:169)
>>> at java.security.AccessController.doPrivileged(Native Method)
>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>> at 
>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
>>> at 
>>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>> at 
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:168)
>>> at 
>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:517)
>>> at 
>>> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:89)
>>>
>>>
>>> Regards,
>>> Vijay
>>>
>>>
>>>
>>>


Re: how to enable metrics in Flink 1.11

2020-10-27 Thread Robert Metzger
Hey Diwakar,

how are you deploying Flink on EMR? Are you using YARN?
If so, you could also use log aggregation to see all the logs at once (from
both JobManager and TaskManagers). (yarn logs -applicationId )

Could you post (or upload somewhere) all logs you have of one run? It is
much easier for us to debug something if we have the full logs (the logs
show for example the classpath that you are using, we would see how you are
deploying Flink, etc.)

>From the information available, my guess is that you have modified your
deployment in some way (use of a custom logging version, custom deployment
method, version mixup with jars from both Flink 1.8 and 1.11, ...).

Best,
Robert


On Tue, Oct 27, 2020 at 12:41 AM Diwakar Jha  wrote:

> This is what I see on the WebUI.
>
> 23:19:24.263 [flink-akka.actor.default-dispatcher-1865] ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler
> - Failed to transfer file from TaskExecutor
> container_1603649952937_0002_01_04.
> java.util.concurrent.CompletionException:
> org.apache.flink.util.FlinkException: The file LOG does not exist on the
> TaskExecutor. at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$25(
> TaskExecutor.java:1742 )
> ~[flink-dist_2.12-1.11.0.jar:1.11.0] at
> java.util.concurrent.CompletableFuture$AsyncSupply.run
> (
> CompletableFuture.java:1604 )
> ~[?:1.8.0_252] at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149 )
> ~[?:1.8.0_252] at java.util.concurrent.ThreadPoolExecutor$Worker.run
> (
> ThreadPoolExecutor.java:624 )
> ~[?:1.8.0_252] at java.lang.Thread.run (
> Thread.java:748 ) ~[?:1.8.0_252] Caused by:
> org.apache.flink.util.FlinkException: The file LOG does not exist on the
> TaskExecutor. ... 5 more 23:19:24.275
> [flink-akka.actor.default-dispatcher-1865] ERROR
> org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler
> - Unhandled exception. org.apache.flink.util.FlinkException: The file LOG
> does not exist on the TaskExecutor. at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$requestFileUploadByFilePath$25(
> TaskExecutor.java:1742 )
> ~[flink-dist_2.12-1.11.0.jar:1.11.0] at
> java.util.concurrent.CompletableFuture$AsyncSupply.run
> (
> CompletableFuture.java:1604 )
> ~[?:1.8.0_252] at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149 )
> ~[?:1.8.0_252] at java.util.concurrent.ThreadPoolExecutor$Worker.run
> (
> ThreadPoolExecutor.java:624 )
> ~[?:1.8.0_252] at java.lang.Thread.run (
> Thread.java:748 ) ~[?:1.8.0_252]
>
> Appreciate if anyone has any pointer for this.
>
> On Mon, Oct 26, 2020 at 10:45 AM Chesnay Schepler 
> wrote:
>
>> Flink 1.11 uses slf4j 1.7.15; the easiest way to check the log files is
>> usually via the WebUI.
>>
>> On 10/26/2020 5:30 PM, Diwakar Jha wrote:
>>
>> I think my problem is with Sl4j library. I'm using sl4j 1.7 with Flink
>> 1.11. If that's correct then i appreciate if someone can point me to the
>> exact Slf4j library that i should use with Flink 1.11
>>
>> Flink = 1.11.x;
>> Slf4j = 1.7;
>>
>>
>> On Sun, Oct 25, 2020 at 8:00 PM Diwakar Jha 
>> wrote:
>>
>>> Thanks for checking my configurations. Could you also point me where I
>>> can see the log files? Just to give more details. I'm trying to access
>>> these logs in AWS cloudwatch.
>>>
>>> Best,
>>> Diwakar
>>>
>>> On Sun, Oct 25, 2020 at 2:16 PM Chesnay Schepler 
>>> wrote:
>>>
 With Flink 1.11 reporters were refactored to plugins, and are now
 accessible by default (so you no longer have to bother with copying jars
 around).

 Your configuration appears to be correct, so I suggest to take a look
 at the log files.

 On 10/25/2020 9:52 PM, Diwakar Jha wrote:

 Hello Everyone,

 I'm new to flink and i'm trying to upgrade from flink 1.8 to flink 1.11
 on an emr cluster. after upgrading to flink1.11 One of the differences that
 i see is i don't get any metrics. I found out that flink 1.11 does not have
 *org.apache.flink.metrics.statsd.StatsDReporterFactory* jar in
 /usr/lib/flink/opt which was the case for flink 1.8. Could anyone have any
 pointer to locate
 *org.apache.flink.metrics.statsd.StatsDReporterFactory* jar or how to
 use metrics in flink.1.11?

 Things 

[Flink::Test] access registered accumulators via harness

2020-10-27 Thread Sharipov, Rinat
Hi mates !

I guess that I'm doing something wrong, but I couldn't find a way to access
registered accumulators and their values via
*org.apache.flink.streaming.util.**ProcessFunctionTestHarness *function
wrapper that I'm using to test my functions.

During the code research I've found, that required data is stored in
*org.apache.flink.runtime.metrics.groups.AbstractMetricGroup#*metrics field,
that is private and is not accessible from tests. It's obvious that Flink
somehow accesses this field and exposes counters into it's Web UI.

So I guess that someone can help me to add a check into my Unit Tests for
metrics counting or in case if there is no such ability I'm ready to help
to implement it if the community considers this acceptable.

Thx !


Re: HA on AWS EMR

2020-10-27 Thread Robert Metzger
Hey Averell,

to clarify: You should be able to migrate using a savepoint from 1.10 to
1.11. Restoring from the state stored in Zookeeper (for HA) with a newer
Flink version won't work.

On Mon, Oct 26, 2020 at 5:05 PM Robert Metzger  wrote:

> Hey Averell,
>
> you should be able to migrate savepoints from Flink 1.10 to 1.11.
>
> Is there a simple way for me to reproduce this issue locally? This seems
> to be a rare, but probably valid issue. Are you using any special
> operators? (like the new source API?)
>
> Best,
> Robert
>
> On Wed, Oct 21, 2020 at 11:07 AM Averell  wrote:
>
>> Hello Roman,
>>
>> Thanks for the answer.
>> I have already had that high-availability.storageDir configured to an S3
>> location. Our service is not critical enough, so to save the cost, we are
>> using the single-master EMR setup. I understand that we'll not get YARN HA
>> in that case, but what I expect here is the ability to quickly restore the
>> service in the case of failure. Without Zookeeper, when such failure
>> happens, I'll need to find the last checkpoint of each job and restore
>> from
>> there. With the help of HA feature, I can just start a new
>> flink-yarn-session, then all jobs will be restored.
>>
>> I tried to change zookeeper dataDir config to an EFS location which both
>> the
>> old and new EMR clusters could access, and that worked.
>>
>> However, now I have a new question: is it expectable to restore to a new
>> version of Flink (e.g. saved with Flink1.10 and restored to Flink1.11)? I
>> tried and got some error messages attached below. Not sure that's a bug or
>> expected behaviour.
>>
>> Thanks and best regards,
>> Averell
>>
>> 
>> /07:39:33.906 [main-EventThread] ERROR
>> org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState -
>> Authentication failed
>> 07:40:11.585 [flink-akka.actor.default-dispatcher-2] ERROR
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error
>> occurred
>> in the cluster entrypoint.
>> org.apache.flink.runtime.dispatcher.DispatcherException: Could not start
>> recovered job 6e5c12f1c352dd4e6200c40aebb65745.
>> at
>>
>> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$handleRecoveredJobStartError$0(Dispatcher.java:222)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> at
>>
>> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
>> ~[?:1.8.0_265]
>> at
>>
>> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
>> ~[?:1.8.0_265]
>> at
>>
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>> ~[?:1.8.0_265]
>> at
>>
>> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
>> ~[?:1.8.0_265]
>> at
>>
>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:753)
>> ~[?:1.8.0_265]
>> at
>>
>> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>> ~[?:1.8.0_265]
>> at
>>
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> at
>>
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> at
>>
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> at
>>
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>> ~[flink-dist_2.11-1.11.0.jar:1.11.0]
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>> at
>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>> at
>> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>> at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>> at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>> at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> [flink-dist_2.11-1.11.0.jar:1.11.0]
>>   

Re:Re: pyflink1.11.0 如果elasticsearch host有访问权限,connector如何写入用户名密码

2020-10-27 Thread whh_960101
有没有其他方式可以写入username和password,我了解java 
flink访问elasticsearch是有username和password入口的,pyflink是调用java来执行,应该是有这个入口的吧,有没有大佬可以指点一下,谢谢啦!







在 2020-10-22 16:34:56,"Yangze Guo"  写道:
>1.11版本中尚不支持username和password的设置,这两个配置在1.12中加入了新的es connector[1]
>
>[1] https://issues.apache.org/jira/browse/FLINK-18361
>
>Best,
>Yangze Guo
>
>On Thu, Oct 22, 2020 at 3:47 PM whh_960101  wrote:
>>
>> Hi,各位大佬们:如果要sink数据到elasticsearch host有访问权限,elasticsearch 
>> connector如何写入用户名密码我按照官网里的样例格式来写的,没有找到options写入用户名密码,程序报错Unauthorizedhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.htmlCREATE
>>  TABLE myUserTable (
>>   user_id STRING,
>>   user_name STRING
>>   uv BIGINT,
>>   pv BIGINT,
>>   PRIMARY KEY (user_id) NOT ENFORCED
>> ) WITH (
>>   'connector' = 'elasticsearch-7',
>>   'hosts' = 'http://localhost:9200',
>>   'index' = 'users'
>> );Connector Options
>> | Option | Required | Default | Type | Description |
>> |
>> connector
>> | required | (none) | String | Specify what connector to use, valid values 
>> are:
>> elasticsearch-6: connect to Elasticsearch 6.x cluster
>> elasticsearch-7: connect to Elasticsearch 7.x and later versions cluster
>> |
>> |
>> hosts
>> | required | (none) | String | One or more Elasticsearch hosts to connect 
>> to, e.g. 'http://host_name:9092;http://host_name:9093'. |
>> |
>> index
>> | required | (none) | String | Elasticsearch index for every record. Can be 
>> a static index (e.g. 'myIndex') or a dynamic index (e.g. 
>> 'index-{log_ts|-MM-dd}'). See the following Dynamic Indexsection for 
>> more details. |
>> |
>> document-type
>> | required in 6.x | (none) | String | Elasticsearch document type. Not 
>> necessary anymore in elasticsearch-7. |
>> |
>> document-id.key-delimiter
>> | optional | _ | String | Delimiter for composite keys ("_" by default), 
>> e.g., "$" would result in IDs "KEY1$KEY2$KEY3"." |
>> |
>> failure-handler
>> | optional | fail | String | Failure handling strategy in case a request to 
>> Elasticsearch fails. Valid strategies are:
>> fail: throws an exception if a request fails and thus causes a job failure.
>> ignore: ignores failures and drops the request.
>> retry_rejected: re-adds requests that have failed due to queue capacity 
>> saturation.
>> custom class name: for failure handling with a ActionRequestFailureHandler 
>> subclass.
>> |
>> |
>> sink.flush-on-checkpoint
>> | optional | true | Boolean | Flush on checkpoint or not. When disabled, a 
>> sink will not wait for all pending action requests to be acknowledged by 
>> Elasticsearch on checkpoints. Thus, a sink does NOT provide any strong 
>> guarantees for at-least-once delivery of action requests. |
>> |
>> sink.bulk-flush.max-actions
>> | optional | 1000 | Integer | Maximum number of buffered actions per bulk 
>> request. Can be set to '0' to disable it. |
>> |
>> sink.bulk-flush.max-size
>> | optional | 2mb | MemorySize | Maximum size in memory of buffered actions 
>> per bulk request. Must be in MB granularity. Can be set to '0' to disable 
>> it. |
>> |
>> sink.bulk-flush.interval
>> | optional | 1s | Duration | The interval to flush buffered actions. Can be 
>> set to '0' to disable it. Note, both 'sink.bulk-flush.max-size' and 
>> 'sink.bulk-flush.max-actions' can be set to '0' with the flush interval set 
>> allowing for complete async processing of buffered actions. |
>> |
>> sink.bulk-flush.backoff.strategy
>> | optional | DISABLED | String | Specify how to perform retries if any flush 
>> actions failed due to a temporary request error. Valid strategies are:
>> DISABLED: no retry performed, i.e. fail after the first request error.
>> CONSTANT: wait for backoff delay between retries.
>> EXPONENTIAL: initially wait for backoff delay and increase exponentially 
>> between retries.
>> |
>> |
>> sink.bulk-flush.backoff.max-retries
>> | optional | 8 | Integer | Maximum number of backoff retries. |
>> |
>> sink.bulk-flush.backoff.delay
>> | optional | 50ms | Duration | Delay between each backoff attempt. For 
>> CONSTANT backoff, this is simply the delay between each retry. For 
>> EXPONENTIAL backoff, this is the initial base delay. |
>> |
>> connection.max-retry-timeout
>> | optional | (none) | Duration | Maximum timeout between retries. |
>> |
>> connection.path-prefix
>> | optional | (none) | String | Prefix string to be added to every REST 
>> communication, e.g., '/v1' |
>> |
>> format
>> | optional | json | String | Elasticsearch connector supports to specify a 
>> format. The format must produce a valid json document. By default uses 
>> built-in 'json' format. Please refer to JSON Format page for more details. |
>>
>>
>>
>>
>>
>>
>>





 

pyflink????csv??????????????????????????????????????????

2020-10-27 Thread ????????
,
??csv??
t_env.register_table_source("mySource",
CsvTableSource(r'data\trip\yellow_tripdata_2014-01.csv',
['vendor_id','pickup_datetime','dropoff_datetime','passenger_count',
'trip_distance','pickup_longitude','pickup_latitude','rate_code',
'store_and_fwd_flag','dropoff_longitude','dropoff_latitude',
'payment_type','fare_amount','surcharge','mta_tax','tip_amount',
'tolls_amount','total_amount'],
[DataTypes.STRING(),DataTypes.STRING(),DataTypes.STRING(),DataTypes.BIGINT(),
DataTypes.FLOAT(),DataTypes.FLOAT(),DataTypes.FLOAT(),DataTypes.BIGINT(),
DataTypes.STRING(),DataTypes.FLOAT(),DataTypes.FLOAT(),
DataTypes.STRING(),DataTypes.FLOAT(),DataTypes.FLOAT(),DataTypes.FLOAT(),DataTypes.FLOAT(),
DataTypes.FLOAT(),DataTypes.FLOAT()])
)
CsvTableSource'pickup_longitude','pickup_latitude','dropoff_longitude','dropoff_latitude'
?? 
connect??OldCsv??Schemaconnect
??
??

Re: ValidationException using DataTypeHint in Scalar Function

2020-10-27 Thread Jark Wu
Hi Steve,

Thanks for reaching out to the Flink community. I am pulling in Timo who
might be able to help you with this question.

Best,
Jark


On Mon, 26 Oct 2020 at 23:10, Steve Whelan  wrote:

> Hi,
>
> I have a column of type *RAW('java.util.Map', ?)* that I want to pass to
> a scalar function UDF. I'm using DataTypeHints but hitting an exception.
> What would be the proper DataTypeHint and data type param to achieve this?
>
>   @FunctionHint(
>   input = {@DataTypeHint("RAW"), @DataTypeHint("STRING")},
>   output = @DataTypeHint("STRING")
>   )
>   public static String eval(final Object map, final String key) {
> // business logic
>   }
>
>
> *Exception:*
>
> Caused by: org.apache.flink.table.api.ValidationException: Invalid input
> arguments. Expected signatures are:
> MAP_VALUE(RAW('java.lang.Object', '...'), STRING)
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidInputException(TypeInferenceUtil.java:190)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:131)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypes(TypeInferenceOperandChecker.java:89)
> ... 50 more
> Caused by: org.apache.flink.table.api.ValidationException: Invalid
> argument type at position 0. Data type RAW('java.lang.Object', '...')
> expected but RAW('java.util.Map', ?) passed.
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:137)
> at
> org.apache.flink.table.types.inference.TypeInferenceUtil.adaptArguments(TypeInferenceUtil.java:102)
> at
> org.apache.flink.table.planner.functions.inference.TypeInferenceOperandChecker.checkOperandTypesOrError(TypeInferenceOperandChecker.java:126)
> ... 51 more
>
>
> Thank you,
>
> Steve
>


????????checkpoint????????23????????????????????????????kafka?????????????????????????? ??????????23??????????????

2020-10-27 Thread sun
checkpoint23kafka??
 ??23??




nohup /opt/flink-1.10.2/bin/flink run -c 
com.toonyoo.app.data.realcomputation.Application  
/home/test/ty-app-data-real-computation-1.0.jar -s 
/opt/flink-1.10.2/checkpoints/f1e8c6bfca15c041c7b3dcf99abfffea/chk-1/_metadata 


Re: How to understand NOW() in SQL when using Table & SQL API to develop a streaming app?

2020-10-27 Thread Jark Wu
Hi Longdexin,

In traditional batch sql, NOW() is executed and determined before the job
is submitted and will not change for every processed record.
However, this doesn't make much sense in streaming sql, therefore, NOW()
function in Flink is executed for every record.

Best,
Jark

On Fri, 23 Oct 2020 at 16:30, Till Rohrmann  wrote:

> Hi Longdexin,
>
> thanks for reaching out to the Flink community. I am pulling in Jark who
> might be able to help you with this question.
>
> Cheers,
> Till
>
> On Thu, Oct 22, 2020 at 2:56 PM Longdexin <274522...@qq.com> wrote:
>
>> From my point of view, the value of NOW() function in SQL is certain by
>> the
>> time when the streaming app is launched and will not change with the
>> process
>> time. However, as a new Flink user, I'm not so sure of that. By the way,
>> if
>> my attemp is to keep the time logic to update all the time, what should I
>> do?
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


Re: Rocksdb - Incremental vs full checkpoints

2020-10-27 Thread Yun Tang
Hi Sudharsan

Once enable the incremental checkpoint, the delta size is the same as the size 
of newly uploaded sst files. Which might not be always the same considering 
RocksDB's compression ratio, compaction times and time to flush. If you really 
want to check the details, you could login the machine and find where locates 
state dir to see how sst files stored for each checkpoint when local recovery 
is enabled [1].

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#task-local-recovery

Best
Yun Tang

From: Sudharsan R 
Sent: Monday, October 26, 2020 10:38
To: Yun Tang 
Cc: user@flink.apache.org 
Subject: Re: Rocksdb - Incremental vs full checkpoints

Hi Yun,
Sorry for the late reply - I was doing some reading. As far as i understand, 
when incremental checkpointing is enabled, the reported checkpoint 
size(metrics/UI) is only the size of the deltas and not the full state size. I 
understand that compaction may not get triggered. But, if we are creating a 
fixed amount of state every checkpoint interval, shouldn't the reported 
checkpoint size remain the same(as it is a delta)?


Thanks

Sudharsan

On Tue, Oct 13, 2020 at 11:34 PM Yun Tang 
mailto:myas...@live.com>> wrote:
Hi

This difference of data size of incremental vs full checkpoint is due to the 
different implementations.
The incremental checkpoint strategy upload binary sst files while full 
checkpoint strategy scans the DB and write all kv entries to external DFS.

As your state size is really small (only 200 KB), I think your RocksDB has not 
ever triggered compaction to reduce sst files, that's why the size constantly 
increase.

Best
Yun Tang

From: sudranga mailto:sud.r...@gmail.com>>
Sent: Wednesday, October 14, 2020 10:40
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: Rocksdb - Incremental vs full checkpoints

Hi,
I have an event-window pipeline which handles a fixed number of messages per
second for a fixed number of keys. When i have rocksdb as the state backend
with incremental checkpoints, i see the delta checkpoint size constantly
increase. Please see


I turned off incremental checkpoints and all the checkpoints are 64kb (There
appears to be no state leak in user code or otherwise). It is not clear why
the incremental checkpoints keep increasing in size. Perhaps, the
incremental checkpoints are not incremental(for this small state size) and
are simply full state appended to full state and so on...

>From some posts on this forum, I understand the use case for incremental
checkpoints is designed when the state size is fairly large (Gbs-Tbs) and
where the changes in state are minimal across checkpoints. However, does
this mean that we should not enable incremental checkpointing for use cases
where the state size is much smaller? Would the 'constantly' increasing
snapshot delta size reduce at some point?  I don't see any compaction runs
happening
(taskmanager_job_task_operator_column_family_rocksdb.num-running-compactions).
Not sure if that is what I am missing...

Thanks
Sudharsan



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [SURVEY] Remove Mesos support

2020-10-27 Thread Xintong Song
Hi Piyush,

Thanks a lot for sharing the information. It would be a great relief that
you are good with Flink on Mesos as is.

As for the jira issues, I believe the most essential ones should have
already been resolved. You may find some remaining open issues here [1],
but not all of them are necessary if we decide to keep Flink on Mesos as is.

At the moment and in the short future, I think helps are mostly needed on
testing the upcoming release 1.12 with Mesos use cases. The community is
currently actively preparing the new release, and hopefully we could come
up with a release candidate early next month. It would be greatly
appreciated if you fork as experienced Flink on Mesos users can help with
verifying the release candidates.

Thank you~

Xintong Song


[1]
https://issues.apache.org/jira/browse/FLINK-17402?jql=project%20%3D%20FLINK%20AND%20component%20%3D%20%22Deployment%20%2F%20Mesos%22%20AND%20status%20%3D%20Open

On Tue, Oct 27, 2020 at 2:58 AM Piyush Narang  wrote:

> Hi Xintong,
>
>
>
> Do you have any jiras that cover any of the items on 1 or 2? I can reach
> out to folks internally and see if I can get some folks to commit to
> helping out.
>
>
>
> To cover the other qs:
>
>- Yes, we’ve not got a plan at the moment to get off Mesos. We use
>Yarn for some our Flink workloads when we can. Mesos is only used when we
>need streaming capabilities in our WW dcs (as our Yarn is centralized in
>one DC)
>- We’re currently on Flink 1.9 (old planner). We have a plan to bump
>to 1.11 / 1.12 this quarter.
>- We typically upgrade once every 6 months to a year (not every
>release). We’d like to speed up the cadence but we’re not there yet.
>- We’d largely be good with keeping Flink on Mesos as-is and
>functional while missing out on some of the newer features. We understand
>the pain on the communities side and we can take on the work if we see some
>fancy improvement in Flink on Yarn / K8s that we want in Mesos to put in
>the request to port it over.
>
>
>
> Thanks,
>
>
>
> -- Piyush
>
>
>
>
>
> *From: *Xintong Song 
> *Date: *Sunday, October 25, 2020 at 10:57 PM
> *To: *dev , user 
> *Cc: *Lasse Nedergaard , <
> p.nar...@criteo.com>
> *Subject: *Re: [SURVEY] Remove Mesos support
>
>
>
> Thanks for sharing the information with us, Piyush an Lasse.
>
>
>
> @Piyush
>
>
>
> Thanks for offering the help. IMO, there are currently several problems
> that make supporting Flink on Mesos challenging for us.
>
>1. *Lack of Mesos experts.* AFAIK, there are very few people (if not
>none) among the active contributors in this community that are
>familiar with Mesos and can help with development on this component.
>2. *Absence of tests.* Mesos does not provide a testing cluster, like
>`MiniYARNCluster`, making it hard to test interactions between Flink and
>Mesos. We have only a few very simple e2e tests running on Mesos deployed
>in a docker, covering the most fundamental workflows. We are not sure how
>well those tests work, especially against some potential corner cases.
>3. *Divergence from other deployment.* Because of 1 and 2, the new
>efforts (features, maintenance, refactors) tend to exclude Mesos if
>possible. When the new efforts have to touch the Mesos related components
>(e.g., changes to the common resource manager interfaces), we have to be
>very careful and make as few changes as possible, to avoid accidentally
>breaking anything that we are not familiar with. As a result, the component
>diverges a lot from other deployment components (K8s/Yarn), which makes it
>harder to maintain.
>
> It would be greatly appreciated if you can help with either of the above
> issues.
>
>
>
> Additionally, I have a few questions concerning your use cases at Criteo.
> IIUC, you are going to stay on Mesos in the foreseeable future, while
> keeping the Flink version up-to-date? What Flink version are you currently
> using? How often do you upgrade (e.g., every release)? Would you be good
> with keeping the Flink on Mesos component as it is (means that deployment
> and resource management improvements may not be ported to Mesos), while
> keeping other components up-to-date (e.g., improvements from programming
> APIs, operators, state backens, etc.)?
>
>
>
> Thank you~
>
> Xintong Song
>
>
>
>
>
> On Sat, Oct 24, 2020 at 2:48 AM Lasse Nedergaard <
> lassenedergaardfl...@gmail.com> wrote:
>
> Hi
>
>
>
> At Trackunit We have been using Mesos for long time but have now moved to
> k8s.
>
> Med venlig hilsen / Best regards
>
> Lasse Nedergaard
>
>
>
>
>
> Den 23. okt. 2020 kl. 17.01 skrev Robert Metzger :
>
> 
>
> Hey Piyush,
>
> thanks a lot for raising this concern. I believe we should keep Mesos in
> Flink then in the foreseeable future.
>
> Your offer to help is much appreciated. We'll let you know once there is
> something.
>
>
>
> On Fri, Oct 23, 2020 at 4:28 PM Piyush Narang  wrote:
>
> Thanks Kostas. If 

Re: Flink checkpointing state

2020-10-27 Thread Yun Tang
Hi Boris

Please refer to FLINK-12884[1] for current progress of native HA support of k8s 
which targets for release-1.12.

[1] https://issues.apache.org/jira/browse/FLINK-12884

Best
Yun Tang


From: Boris Lublinsky 
Sent: Tuesday, October 27, 2020 2:56
To: user 
Subject: Flink checkpointing state

This is from Flink 1.8:

"Job Manager keeps some state related to checkpointing in it’s memory. This 
state would be lost on Job Manager crashes, which is why this state is 
persisted in ZooKeeper. This means that even though there is no real need for 
the leader election and -discovery part of Flink’s HA mode (as is this handled 
natively by Kubernetes), it still needs to be enabled just for storing the 
checkpoint state.”

Was it ever fixed in Flink 1.10 or 1.11? If running Flink on K8, without HA, 
there is no Zookeeper. And if the above is still the case, then checkpointing 
will never pick up the right one