RE: NoClassDefFoundError in failing-restarting job that uses url classloader

2019-08-09 Thread Subramanyam Ramanathan
Hi.


1)  The url pattern example : 
file:///root/flink-test/lib/dependency.jar

2)  I’m trying to simulate the same issue on a separate flink installation 
with a sample job so that I can share the logs. (However so far I’ve been 
unable to simulate it. Though on our product setup it can be simulated quite 
frequently. )

3)  The job is running in standalone mode. We have separate k8s pods with 
our own images which incorporate the taskmanager and jobmanager for our 
product. A 3rd pod connects using k8s and submits the job

4)  Per job mode

I’m trying to simulate the issue on a separate flink installation outside of 
our produce env. I’ll update as soon as I have results.

Thanks,
Subbu

From: Zhu Zhu [mailto:reed...@gmail.com]
Sent: Friday, August 9, 2019 7:43 AM
To: Subramanyam Ramanathan 
Cc: user@flink.apache.org
Subject: Re: NoClassDefFoundError in failing-restarting job that uses url 
classloader

Hi Subramanyam,

Could you share more information? including:
1. the URL pattern
2. the detailed exception and the log around it
3. the cluster the job is running on, e.g. standalone, yarn, k8s
4. it's session mode or per job mode

This information would be helpful to identify the failure cause.

Thanks,
Zhu Zhu











Subramanyam Ramanathan 
mailto:subramanyam.ramanat...@microfocus.com>>
 于2019年8月9日周五 上午1:45写道:

Hello,

I'm currently using flink 1.7.2.

I'm trying to run a job that's submitted programmatically using the 
ClusterClient API.
   public JobSubmissionResult run(PackagedProgram prog, int 
parallelism)


The job makes use of some jars which I add to the packaged program through the 
Packaged constructor, along with the Jar file.
   public PackagedProgram(File jarFile, List classpaths, String... args)
Normally, This works perfectly and the job runs fine.

However, if there's an error in the job, and the job goes into failing state 
and when it's continously  trying to restart the job for an hour or so, I 
notice a NoClassDefFoundError for some classes in the jars that I load using 
the URL class loader and the job never recovers after that, even if the root 
cause of the issue was fixed (I had a kafka source/sink in my job, and kafka 
was down temporarily, and was brought up after that).
The jar is still available at the path referenced by the url classloader and is 
not tampered with.

Could anyone please give me some pointers with regard to the reason why this 
could happen/what I could be missing here/how can I debug further ?

thanks
Subbu




Re: Strange DataSet behavior when using custom FileInputFormat

2019-08-09 Thread Zhu Zhu
Hi Hynek,

In execution, matrices.first(6).print() is different from
matrices.print().
It is adding a reducer operator to the job which only collects the first
6000 records from the source.
So if your InputFormat can generate more than 6 (which can be
unexpected though), and the trailing data are some how corrupted, this case
may happen.

Do you have the detailed error info that why your program exits?
That can be helpful to identify the root cause.

Thanks,
Zhu Zhu


Hynek Noll  于2019年8月9日周五 下午8:59写道:

> Hi,
> I'm trying to implement a custom FileInputFormat (to read the MNIST
> Dataset).
> The creation of Flink DataSet (DataSet matrices) seems to be OK,
> but when I try to print it using either
> matrices.print();
> or
> matrices.collect();
>
> It finishes with exit code -17.
> (Before, I compiled using Java 11 and aside from a reflection warning,
> this approach caused the program to run indefinitely. Now I use JDK 8)
> The total number of elements is 60 000. Now the strange thing is that when
> I run
>
> matrices.first(6).print();
>
> it does print the elements just fine. But my understanding is that these
> two approaches should work the same way, if there are exactly 60 000
> records.
>
> Is this a bug? Or something that can be explained by my extension of
> FileInputFormat (I might very well not use it correctly)?
>
> Best regards,
> Hynek
>


Fwd: [VOTE] Apache Flink Release 1.9.0, release candidate #2

2019-08-09 Thread Tzu-Li (Gordon) Tai
Hi!

Voting on RC2 for Apache Flink 1.9.0 has started:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Apache-Flink-Release-1-9-0-release-candidate-2-td31542.html

Please check this out if you want to verify your applications against this
new Flink release.

Best,
Gordon

-- Forwarded message -
From: Tzu-Li (Gordon) Tai 
Date: Fri, Aug 9, 2019 at 6:17 PM
Subject: [VOTE] Apache Flink Release 1.9.0, release candidate #2
To: dev 


Hi all,

Release candidate #2 for Apache Flink 1.9.0 is now ready for your review.
This is the first voting candidate for 1.9.0, following the preview
candidates RC0 and RC1.

Please review and vote on release candidate #2 for version 1.9.0, as
follows:
[ ] +1, Approve the release
[ ] -1, Do not approve the release (please provide specific comments)

The complete staging area is available for your review, which includes:
* JIRA release notes [1],
* the official Apache source release and binary convenience releases to be
deployed to dist.apache.org [2], which are signed with the key with
fingerprint 1C1E2394D3194E1944613488F320986D35C33D6A [3],
* all artifacts to be deployed to the Maven Central Repository [4],
* source code tag “release-1.9.0-rc2” [5].

Robert is also preparing a pull request for the announcement blog post in
the works, and will update this voting thread with a link to the pull
request shortly afterwards.

The vote will be open for *at least 72 hours*.
Please cast your votes before *Aug. 14th (Wed.) 2019, 17:00 PM CET*.It is
adopted by majority approval, with at least 3 PMC affirmative votes.Thanks,
Gordon[1]
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12344601
[2] https://dist.apache.org/repos/dist/dev/flink/flink-1.9.0-rc2/
[3] https://dist.apache.org/repos/dist/release/flink/KEYS
[4] https://repository.apache.org/content/repositories/orgapacheflink-1234
[5]
https://gitbox.apache.org/repos/asf?p=flink.git;a=tag;h=refs/tags/release-1.9.0-rc2


Re: Capping RocksDb memory usage

2019-08-09 Thread Yu Li
bq. Yes, we recompiled Flink with rocksdb to have JNI, to enable the
write_buffer_manager after we read that Jira.
I see, then which way are you using to limit the rocksdb memory? Setting
write buffer and block cache size separately or with the "cost memory used
in memtable into block cache" [1] feature? If the latter one, please make
sure you also have this PR [2] in your customized rocksdb.

bq. I noticed that our disk usage (SSD) for RocksDb is always stay around
%2 (or 2.2 GB), which is not the case before we enable RocksDb state backend
Along with the state data ingestion as well as checkpoint execution,
RocksDB state backend will flush sst files out onto local disk (along with
a file uploading to HDFS when checkpointing). For heap backend, all data
resident in memory, and write directly onto HDFS when checkpoint triggered,
thus no local disk space usage.

What's more, notice that if you enable local recovery (check whether
"state.backend.local-recovery" is set to true in your configuration, by
default it's false), there'll be more disk space occupation, but in this
case both heap and rocksdb backend have the cost.

[1]
https://github.com/facebook/rocksdb/wiki/Write-Buffer-Manager#cost-memory-used-in-memtable-to-block-cache
[2] https://github.com/facebook/rocksdb/pull/4695

Best Regards,
Yu


On Fri, 9 Aug 2019 at 15:10, Cam Mach  wrote:

> Hi Yu,
>
> Yes, we recompiled Flink with rocksdb to have JNI, to enable the
> write_buffer_manager after we read that Jira.
> One quick question, I noticed that our disk usage (SSD) for RocksDb is
> always stay around %2 (or 2.2 GB), which is not the case before we enable
> RocksDb state backend. So wondering what stoping it?
>
> Thanks,
> Cam
>
>
>
> On Fri, Aug 9, 2019 at 12:21 AM Yu Li  wrote:
>
>> Hi Cam,
>>
>> Which flink version are you using?
>>
>> Actually I don't think any existing flink release could take usage of the
>> write buffer manager natively through some configuration magic, but
>> requires some "developing" efforts, such as manually building flink with a
>> higher version rocksdb to have the JNI interface to set write buffer
>> manager, and set the write buffer manager into rocksdb's DBOptions with a
>> custom options factory. More details please refer to this comment [1] in
>> FLINK-7289.
>>
>> As mentioned in another thread [2], we are now working on removing all
>> these "manual steps" and making the state backend memory management "hands
>> free", which is also part of the FLIP-49 work. Hopefully we could get this
>> done in 1.10 release, let's see (smile).
>>
>> [1] https://s.apache.org/5ay97
>> [2] https://s.apache.org/ej2zn
>>
>> Best Regards,
>> Yu
>>
>>
>> On Fri, 9 Aug 2019 at 03:53, Congxian Qiu  wrote:
>>
>>> Hi
>>> Maybe FLIP-49[1] "Unified Memory Configuration for TaskExecutors" can
>>> give some information here
>>>
>>> [1]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-49-Unified-Memory-Configuration-for-TaskExecutors-td31436.html
>>> Best,
>>> Congxian
>>>
>>>
>>> Cam Mach  于2019年8月9日周五 上午4:59写道:
>>>
 Hi Biao, Yun and Ning.

 Thanks for your response and pointers. Those are very helpful!

 So far, we have tried with some of those parameters
 (WriterBufferManager, write_buffer_size, write_buffer_count, ...), but
 still continuously having issues with memory.
 Here are our cluster configurations:

- 1 Job Controller (32 GB RAM and 8 cores)
- 10 Task Managers: (32 GB RAM, 8 cores CPU, and 300GB SSD
configured for RocksDB, and we set 10GB heap for each)
- Running under Kuberntes

 We have a pipeline that read/transfer 500 million records (around 1kb
 each), and write to our sink. Our total data is around 1.2 Terabytes. Our
 pipeline configurations are as follows:

- 13 operators - some of them (around 6) are stateful
- Parallelism: 60
- Task slots: 6

 We have run several tests and observed that memory just keep growing
 while our TM's CPU stay around 10 - 15% usage. We are now just focusing
 limiting memory usage from Flink and RocksDB, so Kubernetes won't kill it.

 Any recommendations or advices are greatly appreciated!

 Thanks,




 On Thu, Aug 8, 2019 at 6:57 AM Yun Tang  wrote:

> Hi Cam
>
> I think FLINK-7289 [1] might offer you some insights to control
> RocksDB memory, especially the idea using write buffer manager [2] to
> control the total write buffer memory. If you do not have too many sst
> files, write buffer memory usage would consume much more space than index
> and filter usage. Since Flink would use per state per column family, and
> the write buffer number increase when more column families created.
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-7289
> [2] https://github.com/dataArtisans/frocksdb/pull/4
>
> Best
> Yun Tang
>
>
> 

Status of the Integration of Flink with Hive

2019-08-09 Thread David Morin
Hi,

I want to connect my Flink streaming job to Hive.
At the moment, what is the best way to connect to Hive.
Some features seems to be in development.
Some really cool features have been described here: 
https://fr.slideshare.net/BowenLi9/integrating-flink-with-hive-xuefu-zhang-and-bowen-li-seattle-flink-meetup-feb-2019
My first need is to read and update Hive metadata.
Concerning the Hive data I can store them directly in HDFS (as Orc format) in a 
first step.
thx.

David



How to implement different Join strategies for a Flink stream application?

2019-08-09 Thread Felipe Gutierrez
Hi,
First I was trying to mimic the implementation of the IntervalJoinOperator
which uses a KeyedStream > connect > keyBy > and transform. It was a little
confusing to me because I was not sure if I had to use a window
transformation after transform() or not.

Then I found this answer (https://stackoverflow.com/a/46052661/2096986)
which uses keyBy > connect(keyBy()) > process(CoProcessFunction()) with
some example here (
https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/exercises/datastream_java/process/EventTimeJoinExercise.java
)

If I want to decide whether I use BradCastJoin or HashJoin or any other
Join algorithm, which way do you think it is better? is there any other
example code that I could borrow?

Thanks,
Felipe
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


Re: Capping RocksDb memory usage

2019-08-09 Thread Cam Mach
Hi Yu,

Yes, we recompiled Flink with rocksdb to have JNI, to enable the
write_buffer_manager after we read that Jira.
One quick question, I noticed that our disk usage (SSD) for RocksDb is
always stay around %2 (or 2.2 GB), which is not the case before we enable
RocksDb state backend. So wondering what stoping it?

Thanks,
Cam



On Fri, Aug 9, 2019 at 12:21 AM Yu Li  wrote:

> Hi Cam,
>
> Which flink version are you using?
>
> Actually I don't think any existing flink release could take usage of the
> write buffer manager natively through some configuration magic, but
> requires some "developing" efforts, such as manually building flink with a
> higher version rocksdb to have the JNI interface to set write buffer
> manager, and set the write buffer manager into rocksdb's DBOptions with a
> custom options factory. More details please refer to this comment [1] in
> FLINK-7289.
>
> As mentioned in another thread [2], we are now working on removing all
> these "manual steps" and making the state backend memory management "hands
> free", which is also part of the FLIP-49 work. Hopefully we could get this
> done in 1.10 release, let's see (smile).
>
> [1] https://s.apache.org/5ay97
> [2] https://s.apache.org/ej2zn
>
> Best Regards,
> Yu
>
>
> On Fri, 9 Aug 2019 at 03:53, Congxian Qiu  wrote:
>
>> Hi
>> Maybe FLIP-49[1] "Unified Memory Configuration for TaskExecutors" can
>> give some information here
>>
>> [1]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-49-Unified-Memory-Configuration-for-TaskExecutors-td31436.html
>> Best,
>> Congxian
>>
>>
>> Cam Mach  于2019年8月9日周五 上午4:59写道:
>>
>>> Hi Biao, Yun and Ning.
>>>
>>> Thanks for your response and pointers. Those are very helpful!
>>>
>>> So far, we have tried with some of those parameters
>>> (WriterBufferManager, write_buffer_size, write_buffer_count, ...), but
>>> still continuously having issues with memory.
>>> Here are our cluster configurations:
>>>
>>>- 1 Job Controller (32 GB RAM and 8 cores)
>>>- 10 Task Managers: (32 GB RAM, 8 cores CPU, and 300GB SSD
>>>configured for RocksDB, and we set 10GB heap for each)
>>>- Running under Kuberntes
>>>
>>> We have a pipeline that read/transfer 500 million records (around 1kb
>>> each), and write to our sink. Our total data is around 1.2 Terabytes. Our
>>> pipeline configurations are as follows:
>>>
>>>- 13 operators - some of them (around 6) are stateful
>>>- Parallelism: 60
>>>- Task slots: 6
>>>
>>> We have run several tests and observed that memory just keep growing
>>> while our TM's CPU stay around 10 - 15% usage. We are now just focusing
>>> limiting memory usage from Flink and RocksDB, so Kubernetes won't kill it.
>>>
>>> Any recommendations or advices are greatly appreciated!
>>>
>>> Thanks,
>>>
>>>
>>>
>>>
>>> On Thu, Aug 8, 2019 at 6:57 AM Yun Tang  wrote:
>>>
 Hi Cam

 I think FLINK-7289 [1] might offer you some insights to control RocksDB
 memory, especially the idea using write buffer manager [2] to control the
 total write buffer memory. If you do not have too many sst files, write
 buffer memory usage would consume much more space than index and filter
 usage. Since Flink would use per state per column family, and the write
 buffer number increase when more column families created.


 [1] https://issues.apache.org/jira/browse/FLINK-7289
 [2] https://github.com/dataArtisans/frocksdb/pull/4

 Best
 Yun Tang


 --
 *From:* Cam Mach 
 *Sent:* Thursday, August 8, 2019 21:39
 *To:* Biao Liu 
 *Cc:* miki haiat ; user 
 *Subject:* Re: Capping RocksDb memory usage

 Thanks for your response, Biao.



 On Wed, Aug 7, 2019 at 11:41 PM Biao Liu  wrote:

 Hi Cam,

 AFAIK, that's not an easy thing. Actually it's more like a Rocksdb
 issue. There is a document explaining the memory usage of Rocksdb [1]. It
 might be helpful.

 You could define your own option to tune Rocksdb through
 "state.backend.rocksdb.options-factory" [2]. However I would suggest not to
 do this unless you are fully experienced of Rocksdb. IMO it's quite
 complicated.

 Meanwhile I can share a bit experience of this. We have tried to put
 the cache and filter into block cache before. It's useful to control the
 memory usage. But the performance might be affected at the same time.
 Anyway you could try and tune it. Good luck!

 1. https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB
 2.
 https://ci.apache.org/projects/flink/flink-docs-master/ops/state/large_state_tuning.html#tuning-rocksdb

 Thanks,
 Biao /'bɪ.aʊ/



 On Thu, Aug 8, 2019 at 11:44 AM Cam Mach  wrote:

 Yes, that is correct.
 Cam Mach
 Software Engineer
 E-mail: cammac...@gmail.com
 Tel: 206 972 2768



 On Wed, Aug 7, 2

Strange DataSet behavior when using custom FileInputFormat

2019-08-09 Thread Hynek Noll
Hi,
I'm trying to implement a custom FileInputFormat (to read the MNIST
Dataset).
The creation of Flink DataSet (DataSet matrices) seems to be OK,
but when I try to print it using either
matrices.print();
or
matrices.collect();

It finishes with exit code -17.
(Before, I compiled using Java 11 and aside from a reflection warning, this
approach caused the program to run indefinitely. Now I use JDK 8)
The total number of elements is 60 000. Now the strange thing is that when
I run

matrices.first(6).print();

it does print the elements just fine. But my understanding is that these
two approaches should work the same way, if there are exactly 60 000
records.

Is this a bug? Or something that can be explained by my extension of
FileInputFormat (I might very well not use it correctly)?

Best regards,
Hynek


Re: Flink Table API schema doesn't support nested object in ObjectArray

2019-08-09 Thread Fabian Hueske
Great, thank you!

Am Do., 8. Aug. 2019 um 02:15 Uhr schrieb Jacky Du :

> thanks Fabian , I created a Jira ticket with a code sample .
>
>
> https://issues.apache.org/jira/projects/FLINK/issues/FLINK-13603?filter=allopenissues
>
> I think if the root cause I found is correct, fix this issue could be
> pretty simple .
>
> Thanks
> Jacky Du
>
> Fabian Hueske  于2019年8月2日周五 下午12:07写道:
>
>> Thanks for the bug report Jacky!
>>
>> Would you mind opening a Jira issue, preferably with a code snippet that
>> reproduces the bug?
>>
>> Thank you,
>> Fabian
>>
>> Am Fr., 2. Aug. 2019 um 16:01 Uhr schrieb Jacky Du <
>> jacky.du0...@gmail.com>:
>>
>>> Hi, All
>>>
>>> Just find that Flink Table API have some issue if define nested object
>>> in an object array . it will give column not found exception if a table
>>> schema define like below :
>>>
>>> payload : Row(arraylist : ObjectArrayTypeInfo>> String))>)
>>>
>>> but Table APi works fine if we don't have nested object in array , so
>>> below one is working :
>>>
>>> payload : Row(arraylist : ObjectArrayTypeInfo)
>>>
>>>
>>> This issue happens at 1.6.x , 1.7.x and 1.8.x ,  but working at 1.5.x .
>>>
>>>
>>> Thanks
>>> Jacky Du
>>>
>>


Re: Testing with ProcessingTime and FromElementsFunction with TestEnvironment

2019-08-09 Thread Michal Klempa
Hi guys,
I have opposite issue :-) I would like to unit test negative behavior
- that the Event Time timer is not fired when no further event arrives
(which would advance the watermarks).
But due to StreamSource firing Long.MAX_VALUE watermark after enclosed
finite FromElementsFunction run method depletes, I was get all timers
fired in tested operator (and ProcessFunction).
Is anybody aware of a method, how to test or design Source Function
prevent watermark advancing, so that the timers are not fired (which
is desired behavior - to be guarded by unit tests).
Thanks.
Michal

On Thu, May 9, 2019 at 9:02 AM Till Rohrmann  wrote:
>
> Hi Steve,
>
> afaik there is no such thing in Flink. I agree that Flink's testing utilities 
> should be improved. If you implement such a source, then you might be able to 
> contribute it back to the community. That would be super helpful.
>
> Cheers,
> Till
>
> On Wed, May 8, 2019 at 6:40 PM Steven Nelson  wrote:
>>
>>
>> That’s what I figured was happening :( Your explanation is a lot better than 
>> what I gave to my team, so that will help a lot, thank you!
>>
>> Is there a testing source already created that does this sort of thing? The 
>> Flink-testing library seems a bit sparse.
>>
>> -Steve
>>
>> Sent from my iPhone
>>
>> On May 8, 2019, at 9:33 AM, Till Rohrmann  wrote:
>>
>> Hi Steve,
>>
>> I think the reason for the different behaviour is due to the way event time 
>> and processing time are implemented.
>>
>> When you are using event time, watermarks need to travel through the 
>> topology denoting the current event time. When you source terminates, the 
>> system will send a watermark with Long.MAX_VALUE through the topology. This 
>> will effectively trigger the completion of all pending event time operations.
>>
>> In the case of processing time, Flink does not do this. Instead it simply 
>> relies on the processing time clocks on each machine. Hence, there is no way 
>> for Flink to tell the different machines that their respective processing 
>> time clocks should proceed to a certain time in case of a shutdown. Instead 
>> you should make sure that you don't terminate the job before a certain time 
>> (processing time) has passed. You could do this by adding a sleep to your 
>> source function after you've output all records and just before leaving the 
>> source loop.
>>
>> Cheers,
>> Till
>>
>> On Tue, May 7, 2019 at 11:49 PM Steven Nelson  
>> wrote:
>>>
>>> Hello!
>>>
>>> I am trying to write a test that runs in the TestEnviroment. I create a 
>>> process that uses ProcessingTime, has a source constructed from a 
>>> FromElementsFunction and runs data through a Keyed Stream into a 
>>> ProcessingTimeSessionWindows.withGap().
>>>
>>> The problem is that it appears that the env.execute method returns 
>>> immediately after the session closes, not allowing the events to be 
>>> released from the window before shutdown occurs. This used to work when I 
>>> used EventTime.
>>>
>>> Thoughts?
>>> -Steve


Re: **RegistrationTimeoutException** after TaskExecutor successfully registered at resource manager

2019-08-09 Thread Victor Wong
Hi Biao,

Thanks for your reply, I will give it a try (1.8+)!

Best,
Victor

From: Biao Liu 
Date: Friday, August 9, 2019 at 5:45 PM
To: Victor Wong 
Cc: "user@flink.apache.org" 
Subject: Re: **RegistrationTimeoutException** after TaskExecutor successfully 
registered at resource manager

Hi Victor,

There used to be several relevant issues reported [1] [2] [3]. I guess you have 
encountered the same problem.
This issue has been fixed in 1.8 [4]. Could you try it on a later version 
(1.8+)?

1. https://issues.apache.org/jira/browse/FLINK-11137
2. https://issues.apache.org/jira/browse/FLINK-11215
3. https://issues.apache.org/jira/browse/FLINK-11708
4. https://issues.apache.org/jira/browse/FLINK-11718

Thanks,
Biao /'bɪ.aʊ/



On Fri, Aug 9, 2019 at 4:01 PM Victor Wong 
mailto:jiasheng.w...@outlook.com>> wrote:
Hi,
I’m using Flink version 1.7.1, and I encountered this exception which was a 
little weird from my point of view;
TaskManager successfully registered at resource manager, however after 5 
minutes (which is the default value of taskmanager.registration.timeout config) 
it threw out RegistrationTimeoutException;

Here is the related logs of TM:
2019-08-09 01:30:24,061 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Connecting to 
ResourceManager 
akka.tcp://flink@xxx/user/resourcemanager().
2019-08-09 01:30:24,296 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Resolved 
ResourceManager address, beginning registration
2019-08-09 01:30:24,296 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Registration at 
ResourceManager attempt 1 (timeout=100ms)
2019-08-09 01:30:24,379 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Successful 
registration at resource manager akka.tcp://flink@xxx/user/resourcemanager 
under registration id 4535dea14648f6de68f32fb1a375806e.
2019-08-09 01:30:24,404 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Receive slot 
request AllocationID{372d1e10019c93c6c41d52b449cea5f2} for job 
e7b86795178efe43d7cac107c6cb8c33 from resource manager with leader id 
.
…
2019-08-09 01:30:33,590 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Un-registering 
task and sending final execution state FINISHED to JobManager for task Source: 
 ; // I don’t know if this is related, so I add it here in case;  This 
Flink Kafka source just finished because it consumed no Kafka partitions (Flink 
Kafka parallelism > Kafka topic partitions)
…
2019-08-09 01:35:24,753 ERROR 
org.apache.flink.runtime.taskexecutor.TaskExecutor- Fatal error 
occurred in TaskExecutor akka.tcp://flink@xxx/user/taskmanager_0.
org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException: 
Could not register at the ResourceManager within the specified maximum 
registration duration 30 ms. This indicates a problem with this instance. 
Terminating now.
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1037)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$3(TaskExecutor.java:1023)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Thanks,
Victor


Re: **RegistrationTimeoutException** after TaskExecutor successfully registered at resource manager

2019-08-09 Thread Biao Liu
Hi Victor,

There used to be several relevant issues reported [1] [2] [3]. I guess you
have encountered the same problem.
This issue has been fixed in 1.8 [4]. Could you try it on a later version
(1.8+)?

1. https://issues.apache.org/jira/browse/FLINK-11137
2. https://issues.apache.org/jira/browse/FLINK-11215
3. https://issues.apache.org/jira/browse/FLINK-11708
4. https://issues.apache.org/jira/browse/FLINK-11718

Thanks,
Biao /'bɪ.aʊ/



On Fri, Aug 9, 2019 at 4:01 PM Victor Wong 
wrote:

> Hi,
>
> I’m using Flink version *1.7.1*, and I encountered this exception which
> was a little weird from my point of view;
>
> TaskManager successfully registered at resource manager, however after 5
> minutes (which is the default value of taskmanager.registration.timeout
> config) it threw out RegistrationTimeoutException;
>
>
>
> Here is the related logs of TM:
>
> 2019-08-09 01:30:24,061 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Connecting
> to ResourceManager akka.tcp://flink@xxx
> /user/resourcemanager().
>
> 2019-08-09 01:30:24,296 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Resolved
> ResourceManager address, beginning registration
>
> 2019-08-09 01:30:24,296 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor-
> Registration at ResourceManager attempt 1 (timeout=100ms)
>
> 2019-08-09 01:30:24,379 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor- *Successful
> registration at resource manager* akka.tcp://flink@xxx/user/resourcemanager
> under registration id 4535dea14648f6de68f32fb1a375806e.
>
> 2019-08-09 01:30:24,404 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Receive
> slot request AllocationID{372d1e10019c93c6c41d52b449cea5f2} for job
> e7b86795178efe43d7cac107c6cb8c33 from resource manager with leader id
> .
>
> …
>
> 2019-08-09 01:30:33,590 INFO
> org.apache.flink.runtime.taskexecutor.TaskExecutor-
> Un-registering task and sending final execution state FINISHED to
> JobManager for task Source:  ; // *I don’t know if this is related,
> so I add it here in case;  This Flink Kafka source just finished because it
> consumed no Kafka partitions (Flink Kafka parallelism > Kafka topic
> partitions)*
>
> …
>
> 2019-08-09 01:35:24,753 ERROR
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Fatal error
> occurred in TaskExecutor akka.tcp://flink@xxx/user/taskmanager_0.
>
> org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException:
> Could not register at the ResourceManager within the specified maximum
> registration duration 30 ms. This indicates a problem with this
> instance. Terminating now.
>
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1037)
>
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$3(TaskExecutor.java:1023)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>
>at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>
> at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
> Thanks,
>
> Victor
>


Re: some slots are not be available,when job is not running

2019-08-09 Thread Zhu Zhu
Hi pengchengling,

Does this issue happen before you submitting any job to the cluster or
after some jobs are terminated?
If it's the latter case, didi you wait for a while to see if the
unavailable slots became available again?

Thanks,
Zhu Zhu

pengcheng...@bonc.com.cn  于2019年8月9日周五 下午4:55写道:

> Hi,
>
> Why are some slots unavailable?
>
> My cluster model is standalone,and high-availability mode is zookeeper.
> task.cancellation.timeout: 0
> some slots are not be available,when job is not running.
>
>
>
> --
> pengcheng...@bonc.com.cn
>


Re: some slots are not be available,when job is not running

2019-08-09 Thread Zili Chen
Hi,

Could you attach the stack trace in exception or relevant logs?

Best,
tison.


pengcheng...@bonc.com.cn  于2019年8月9日周五 下午4:55写道:

> Hi,
>
> Why are some slots unavailable?
>
> My cluster model is standalone,and high-availability mode is zookeeper.
> task.cancellation.timeout: 0
> some slots are not be available,when job is not running.
>
>
>
> --
> pengcheng...@bonc.com.cn
>


some slots are not be available,when job is not running

2019-08-09 Thread pengcheng...@bonc.com.cn
Hi,

Why are some slots unavailable?

My cluster model is standalone,and high-availability mode is zookeeper. 
task.cancellation.timeout: 0
some slots are not be available,when job is not running.





pengcheng...@bonc.com.cn


Re: How to make two SQLs use the same KafkaTableSource?

2019-08-09 Thread Tony Wei
Hi Zhenghua,

Blink planner support lazy translation for multiple SQLs, and the common
> nodes will be reused in a single job.
>

It is very helpful, and thanks for your clarification.


> The only thing you need note here is the unified TableEnvironmentImpl do
> not support conversions between Table(s) and Stream(s).
> U must use pure SQL api (DDL/DML by sqlUpdate, DQL by sqlQuery).


Does this mean that only common nodes that generated from pure SQL api can
be reused. Operator nodes
created from DataStream api will not be recognized by Blink planner? If
this is the case, it is fine with me. My
original question just focused on reused nodes in SQL api, and seems Blink
planner is what I need. Thanks
for your help again.

Best,
Tony Wei

Zhenghua Gao  於 2019年8月9日 週五 下午1:54寫道:

> Blink planner support lazy translation for multiple SQLs, and the common
> nodes will be reused in a single job.
> The only thing you need note here is the unified TableEnvironmentImpl do
> not support conversions between Table(s) and Stream(s).
> U must use pure SQL api (DDL/DML by sqlUpdate, DQL by sqlQuery).
>
> *Best Regards,*
> *Zhenghua Gao*
>
>
> On Fri, Aug 9, 2019 at 12:38 PM Tony Wei  wrote:
>
>> forgot to send to user mailing list.
>>
>> Tony Wei  於 2019年8月9日 週五 下午12:36寫道:
>>
>>> Hi Zhenghua,
>>>
>>> I didn't get your point. It seems that `isEagerOperationTranslation` is
>>> always return false. Is that
>>> means even I used Blink planner, the sql translation is still in a lazy
>>> manner?
>>>
>>> Or do you mean Blink planner will recognize and link two SQLs to the
>>> same kafka source, if
>>> they both use the same kafka table, even if the translation is lazy?
>>>
>>> I'm not familiar with the details of translation process, but I guessed
>>> the translating eagerly is not
>>> be an only solution. If the translation of the second SQL can reuse the
>>> operators from the first SQL,
>>> then it is possible to link them to the same kafka source operator.
>>>
>>> Best,
>>> Tony Wei
>>>
>>> Zhenghua Gao  於 2019年8月9日 週五 上午11:57寫道:
>>>
 This needs EagerOperationTranslation[1]
 
 support. you can try in Blink planner in 1.9.0.

 [1]
 https://github.com/apache/flink/blob/master/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java#L413

 *Best Regards,*
 *Zhenghua Gao*


 On Fri, Aug 9, 2019 at 10:37 AM Tony Wei 
 wrote:

> Hi,
>
> I used `flinkTableEnv.connect(new
> Kafka()...).registerTableSource(...)` to register my kafka table.
> However, I found that because SQL is a lazy operation, it will convert
> to DataStream under some
> criteria. For example, `Table#toRetractStream`.
>
> So, when I used two SQLs in one application job, the same kafka table
> will be constructed twice. It
> is not a problem from flink side, because two operators held their own
> state for offsets. But from
> kafka side, they will have the same group_id.
>
> I want to make sure that only one kafka source will commit group_id's
> offsets back to kafka. A
> workaround might be registering the same kafka topic twice with
> different name, group_id for
> two SQLs. But I would still like to know if there is any way to make
> two SQLs just read from the
> same KafkaTableSource? Thanks in advance.
>
> Best,
> Tony Wei
>



**RegistrationTimeoutException** after TaskExecutor successfully registered at resource manager

2019-08-09 Thread Victor Wong
Hi,
I’m using Flink version 1.7.1, and I encountered this exception which was a 
little weird from my point of view;
TaskManager successfully registered at resource manager, however after 5 
minutes (which is the default value of taskmanager.registration.timeout config) 
it threw out RegistrationTimeoutException;

Here is the related logs of TM:
2019-08-09 01:30:24,061 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Connecting to 
ResourceManager 
akka.tcp://flink@xxx/user/resourcemanager().
2019-08-09 01:30:24,296 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Resolved 
ResourceManager address, beginning registration
2019-08-09 01:30:24,296 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Registration at 
ResourceManager attempt 1 (timeout=100ms)
2019-08-09 01:30:24,379 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Successful 
registration at resource manager akka.tcp://flink@xxx/user/resourcemanager 
under registration id 4535dea14648f6de68f32fb1a375806e.
2019-08-09 01:30:24,404 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Receive slot 
request AllocationID{372d1e10019c93c6c41d52b449cea5f2} for job 
e7b86795178efe43d7cac107c6cb8c33 from resource manager with leader id 
.
…
2019-08-09 01:30:33,590 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor- Un-registering 
task and sending final execution state FINISHED to JobManager for task Source: 
 ; // I don’t know if this is related, so I add it here in case;  This 
Flink Kafka source just finished because it consumed no Kafka partitions (Flink 
Kafka parallelism > Kafka topic partitions)
…
2019-08-09 01:35:24,753 ERROR 
org.apache.flink.runtime.taskexecutor.TaskExecutor- Fatal error 
occurred in TaskExecutor akka.tcp://flink@xxx/user/taskmanager_0.
org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException: 
Could not register at the ResourceManager within the specified maximum 
registration duration 30 ms. This indicates a problem with this instance. 
Terminating now.
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1037)
at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$3(TaskExecutor.java:1023)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Thanks,
Victor


Re: Capping RocksDb memory usage

2019-08-09 Thread Yu Li
Hi Cam,

Which flink version are you using?

Actually I don't think any existing flink release could take usage of the
write buffer manager natively through some configuration magic, but
requires some "developing" efforts, such as manually building flink with a
higher version rocksdb to have the JNI interface to set write buffer
manager, and set the write buffer manager into rocksdb's DBOptions with a
custom options factory. More details please refer to this comment [1] in
FLINK-7289.

As mentioned in another thread [2], we are now working on removing all
these "manual steps" and making the state backend memory management "hands
free", which is also part of the FLIP-49 work. Hopefully we could get this
done in 1.10 release, let's see (smile).

[1] https://s.apache.org/5ay97
[2] https://s.apache.org/ej2zn

Best Regards,
Yu


On Fri, 9 Aug 2019 at 03:53, Congxian Qiu  wrote:

> Hi
> Maybe FLIP-49[1] "Unified Memory Configuration for TaskExecutors" can give
> some information here
>
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-49-Unified-Memory-Configuration-for-TaskExecutors-td31436.html
> Best,
> Congxian
>
>
> Cam Mach  于2019年8月9日周五 上午4:59写道:
>
>> Hi Biao, Yun and Ning.
>>
>> Thanks for your response and pointers. Those are very helpful!
>>
>> So far, we have tried with some of those parameters (WriterBufferManager,
>> write_buffer_size, write_buffer_count, ...), but still continuously having
>> issues with memory.
>> Here are our cluster configurations:
>>
>>- 1 Job Controller (32 GB RAM and 8 cores)
>>- 10 Task Managers: (32 GB RAM, 8 cores CPU, and 300GB SSD configured
>>for RocksDB, and we set 10GB heap for each)
>>- Running under Kuberntes
>>
>> We have a pipeline that read/transfer 500 million records (around 1kb
>> each), and write to our sink. Our total data is around 1.2 Terabytes. Our
>> pipeline configurations are as follows:
>>
>>- 13 operators - some of them (around 6) are stateful
>>- Parallelism: 60
>>- Task slots: 6
>>
>> We have run several tests and observed that memory just keep growing
>> while our TM's CPU stay around 10 - 15% usage. We are now just focusing
>> limiting memory usage from Flink and RocksDB, so Kubernetes won't kill it.
>>
>> Any recommendations or advices are greatly appreciated!
>>
>> Thanks,
>>
>>
>>
>>
>> On Thu, Aug 8, 2019 at 6:57 AM Yun Tang  wrote:
>>
>>> Hi Cam
>>>
>>> I think FLINK-7289 [1] might offer you some insights to control RocksDB
>>> memory, especially the idea using write buffer manager [2] to control the
>>> total write buffer memory. If you do not have too many sst files, write
>>> buffer memory usage would consume much more space than index and filter
>>> usage. Since Flink would use per state per column family, and the write
>>> buffer number increase when more column families created.
>>>
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-7289
>>> [2] https://github.com/dataArtisans/frocksdb/pull/4
>>>
>>> Best
>>> Yun Tang
>>>
>>>
>>> --
>>> *From:* Cam Mach 
>>> *Sent:* Thursday, August 8, 2019 21:39
>>> *To:* Biao Liu 
>>> *Cc:* miki haiat ; user 
>>> *Subject:* Re: Capping RocksDb memory usage
>>>
>>> Thanks for your response, Biao.
>>>
>>>
>>>
>>> On Wed, Aug 7, 2019 at 11:41 PM Biao Liu  wrote:
>>>
>>> Hi Cam,
>>>
>>> AFAIK, that's not an easy thing. Actually it's more like a Rocksdb
>>> issue. There is a document explaining the memory usage of Rocksdb [1]. It
>>> might be helpful.
>>>
>>> You could define your own option to tune Rocksdb through
>>> "state.backend.rocksdb.options-factory" [2]. However I would suggest not to
>>> do this unless you are fully experienced of Rocksdb. IMO it's quite
>>> complicated.
>>>
>>> Meanwhile I can share a bit experience of this. We have tried to put the
>>> cache and filter into block cache before. It's useful to control the memory
>>> usage. But the performance might be affected at the same time. Anyway you
>>> could try and tune it. Good luck!
>>>
>>> 1. https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB
>>> 2.
>>> https://ci.apache.org/projects/flink/flink-docs-master/ops/state/large_state_tuning.html#tuning-rocksdb
>>>
>>> Thanks,
>>> Biao /'bɪ.aʊ/
>>>
>>>
>>>
>>> On Thu, Aug 8, 2019 at 11:44 AM Cam Mach  wrote:
>>>
>>> Yes, that is correct.
>>> Cam Mach
>>> Software Engineer
>>> E-mail: cammac...@gmail.com
>>> Tel: 206 972 2768
>>>
>>>
>>>
>>> On Wed, Aug 7, 2019 at 8:33 PM Biao Liu  wrote:
>>>
>>> Hi Cam,
>>>
>>> Do you mean you want to limit the memory usage of RocksDB state backend?
>>>
>>> Thanks,
>>> Biao /'bɪ.aʊ/
>>>
>>>
>>>
>>> On Thu, Aug 8, 2019 at 2:23 AM miki haiat  wrote:
>>>
>>> I think using metrics exporter is the easiest way
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#rocksdb
>>>
>>>
>>> On Wed, Aug 7, 2019, 20:28 Cam Mach  wrote:
>>>
>>> Hello everyone,
>>>
>>> What is the most easy and efficiently way to cap RocksDb's