Re:File Naming Pattern from HadoopOutputFormat

2019-07-01 Thread Haibo Sun
Hi, Andreas


I think the following things may be what you want.


1. For writing Avro, I think you can extend AvroOutputFormat and override the  
getDirectoryFileName() method to customize a file name, as shown below.
The javadoc of AvroOutputFormat: 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/formats/avro/AvroOutputFormat.html


public static class CustomAvroOutputFormat extends AvroOutputFormat {
public CustomAvroOutputFormat(Path filePath, Class type) {
super(filePath, type);
}

public CustomAvroOutputFormat(Class type) {
super(type);
}

@Override
public void open(int taskNumber, int numTasks) throws 
IOException {
this.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
super.open(taskNumber, numTasks);
}

@Override
protected String getDirectoryFileName(int taskNumber) {
// returns a custom filename
return null;
}
}


2. For writing Parquet, you can refer to ParquetStreamingFileSinkITCase, 
StreamingFileSink#forBulkFormat and DateTimeBucketAssigner. You can create a 
class that implements the BucketAssigner interface and return a custom file 
name in the getBucketId() method (the value returned by getBucketId() will be 
treated as the file name).


ParquetStreamingFileSinkITCase:  
https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java


StreamingFileSink#forBulkFormat: 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java


DateTimeBucketAssigner: 
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java




Best,
Haibo

At 2019-07-02 04:15:07, "Hailu, Andreas"  wrote:


Hello Flink team,

 

I’m writing Avro and Parquet files to HDFS, and I’ve would like to include a 
UUID as a part of the file name.

 

Our files in HDFS currently follow this pattern:

 

tmp-r-1.snappy.parquet

tmp-r-2.snappy.parquet

...

 

I’m using a custom output format which extends a RichOutputFormat - is this 
something which is natively supported? If so, could you please recommend how 
this could be done, or share the relevant document?

 

Best,

Andreas




Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices


Re: Batch mode with Flink 1.8 unstable?

2019-07-01 Thread Ken Krugler
Hi Stephan,

Thanks for responding, comments inline below…

Regards,

— Ken

> On Jun 26, 2019, at 7:50 AM, Stephan Ewen  wrote:
> 
> Hi Ken!
> 
> Sorry to hear you are going through this experience. The major focus on 
> streaming so far means that the DataSet API has stability issues at scale.
> So, yes, batch mode in current Flink version can be somewhat tricky.
> 
> It is a big focus of Flink 1.9 to fix the batch mode, finally, and by 
> addressing batch specific scheduling / recovery / and shuffle issues.
> 
> Let me go through the issues you found:
> 
> (1) Input splits and oversized RPC
> 
> Your explanation seems correct, timeout due to dropping oversized RPC message.
> 
> I don't quite understand how that exactly happens, because the size limit is 
> 10 MB and input splits should be rather small in most cases.
> Are you running custom sources which put large data into splits? Maybe 
> accidentally, by having a large serialized closure in the splits?

As per my email to Till, I don’t feel like I’m doing anything tricky, though I 
am reading Hadoop sequence files that contain Cascading Tuple/Tuple key/value 
data.

> The fix would be this issue: https://issues.apache.org/jira/browse/FLINK-4399 
>   
> 
> (2) TM early release
> 
> The 1.8 version had a fix that should work for regular cases without 
> fine-grained failure recovery.
> 1.9 should have a more general fix that also works for fine-grained recovery
> 
> Are you trying to use the finer grained failover with the batch job?

No, or at least I’m not doing anything special to enable it.

Is there something I need to do to explicitly _disable_ it?

> The finer-grained failover is not working in batch for 1.8, that is why it is 
> not an advertised feature (it only works for streaming so far).
> 
> The goal is that this works in the 1.9 release (aka the batch fixup release)
> 
> (3) Hang in Processing
> 
> I think a thread dump (jstack) from the TMs would be helpful to diagnose that.
> There are known issues with the current batch shuffle implementation, which 
> is why 1.9 is getting a new bounded-blocking stream shuffle implementation.

Next time it happens, I’ll dump the threads.

I should have done it this time, but was in a hurry to kill the EMR cluster as 
it had been costing money all night long :(



> On Mon, Jun 24, 2019 at 2:32 AM Ken Krugler  > wrote:
> Hi all,
> 
> I’ve been running a somewhat complex batch job (in EMR/YARN) with Flink 
> 1.8.0, and it regularly fails, but for varying reasons.
> 
> Has anyone else had stability with 1.8.0 in batch mode and non-trivial 
> workflows?
> 
> Thanks,
> 
> — Ken
> 
> 1. TimeoutException getting input splits
> 
> The batch job starts by processing a lot of files that live in S3. During 
> this phase, I sometimes see:
> 
> 2019-06-20 01:20:22,659 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN 
> DataSource (at createInput(ExecutionEnvironment.java:549) 
> (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map (ad 
> dailies) -> Filter (Filter at 
> createWorkflow(AdvertiserSimilarityWorkflow.java:34)) -> Filter (Filter at 
> createWorkflow(AdvertiserSimilarityWorkflow.java:36)) -> Filter (Filter at 
> createWorkflow(AdvertiserSimilarityWorkflow.java:38)) -> Map (Key Extractor) 
> -> Combine (Reduce at createWorkflow(AdvertiserSimilarityWorkflow.java:41)) 
> (31/32) (8a8cbea47394c3d638910c36ac62d877) switched from RUNNING to FAILED.
> java.lang.RuntimeException: Could not retrieve next input split.
>   at 
> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:367)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:160)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>   at java.lang.Thread.run(Thread.java:748)
> Caused by: 
> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: 
> Requesting the next input split failed.
>   at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:69)
>   at 
> org.apache.flink.runtime.operators.DataSourceTask$1.hasNext(DataSourceTask.java:365)
>   ... 3 more
> Caused by: java.util.concurrent.TimeoutException
>   at 
> java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1771)
>   at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
>   at 
> org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider.getNextInputSplit(RpcInputSplitProvider.java:61)
>   ... 4 more
> 2019-06-20 01:20:22,664 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Flink 
> Java Job at Thu Jun 20 01:11:28 UTC 2019 (5564b8980f40d788d7ef312318709e4d) 
> switched from state RUNNING to FAILING.
> java.lang.RuntimeException: Could not retrieve next input split.
>   at 
> org.apache.flin

Random errors reading binary files in batch workflow

2019-07-01 Thread Ken Krugler
Hi all,

My new latest issue is that regularly (but not always) I get a 
java.io.UTFDataFormatException when trying to read in serialized records.

I can re-run the exact same workflow, on the same cluster, with the same input 
data, and sometimes it works.

It seems like the higher the parallelism, the more likely that an error happens.

The fact that sometimes it’s OK feels like it’s not a problem with corrupted 
records (previously written out by an upstream workflow), as that should cause 
a consistent failure.

The error occurs when reading from both S3 and HDFS.

When the error occurs, it looks like this (fails on deserializing the first 
field in the POJO):

2019-07-01 22:12:02,542 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- DataSource 
(feature vector source) (36/64) (577f1375e15df4a5352a405fb8b21204) switched 
from RUNNING to FAILED.
java.io.UTFDataFormatException: malformed input around byte 2
at java.io.DataInputStream.readUTF(DataInputStream.java:634)
at java.io.DataInputStream.readUTF(DataInputStream.java:564)
at 
com.adbeat.similarity.FeatureVectorWithCountry.read(FeatureVectorWithCountry.java:47)
at 
org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:39)
at 
org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:32)
at 
org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:305)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:192)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)

FeatureVectorWithCountry is a POJO that implements the IOReadableWriteable 
interface.

It also sometimes fails while reading a different POJO, which is in a different 
input DataSet in the same workflow:

2019-07-01 00:39:05,829 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- DataSource (at 
createWorkflow(AdvertiserSimilarityWorkflow.java:88) 
(org.apache.flink.api.common.io.SerializedInputFormat)) (17/48) 
(021bc0011dd523a4314d4e52f97a2486) switched from RUNNING to FAILED.
java.io.UTFDataFormatException: malformed input around byte 50
at java.io.DataInputStream.readUTF(DataInputStream.java:656)
at java.io.DataInputStream.readUTF(DataInputStream.java:564)
at com.adbeat.similarity.advertiser.AdText.read(AdText.java:170)
at 
org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:39)
at 
org.apache.flink.api.common.io.SerializedInputFormat.deserialize(SerializedInputFormat.java:32)
at 
org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:305)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:192)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)

I don’t see any preceding errors in the logs.

It seems like the calculation of valid starting offsets in a split are 
sometimes wrong, and thus it starts trying to read a record from an incorrect 
location.

Has anyone else run into this?

Thanks,

— Ken

--
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra



Re: Batch mode with Flink 1.8 unstable?

2019-07-01 Thread Ken Krugler
Hi Till,

Thanks for following up.

I’ve got answers to other emails on this thread pending, but wanted to respond 
to this one now.

> On Jul 1, 2019, at 7:20 AM, Till Rohrmann  wrote:
> 
> Quick addition for problem (1): The AkkaRpcActor should serialize the 
> response if it is a remote RPC and send an AkkaRpcException if the response's 
> size exceeds the maximum frame size. This should be visible on the call site 
> since the future should be completed with this exception. I'm wondering why 
> you don't see this exception. 
> 
> It could be helpful to better understand the input splits your program is 
> generating. Is it simply a `FileInputSplit` or did you implement a custom 
> InputSplitAssigner with custom InputSplits?

I’m reading from about 10K files stored in S3.

These are files created using Cascading, so it’s a Hadoop SequenceFile 
containing a (Cascading) Tuple for the key, and a Tuple for the value.

Removing some logic cruft, it looks like…

Job job = Job.getInstance();
job.getConfiguration().set("io.serializations", 
"cascading.tuple.hadoop.TupleSerialization");
FileInputFormat.addInputPath(job, new Path(inputDir));

HadoopInputFormat inputFormat = 
HadoopInputs.createHadoopInput(new SequenceFileInputFormat(), 
Tuple.class, Tuple.class, job);

Configuration parameters = new Configuration();
parameters.setBoolean("recursive.file.enumeration", true);
inputFormat.configure(parameters);

DataSet adDailies = env.createInput(inputFormat)
.map(new CreateAdDaily())
.name("ad dailies");

Thanks again,

— Ken



> 
> On Mon, Jul 1, 2019 at 11:57 AM Till Rohrmann  > wrote:
> Hi Ken,
> 
> in order to further debug your problems it would be helpful if you could 
> share the log files on DEBUG level with us.
> 
> For problem (2), I suspect that it has been caused by Flink releasing TMs too 
> early. This should be fixed with FLINK-10941 which is part of Flink 1.8.1. 
> The 1.8.1 release should be released very soonish. It would be great if you 
> could try your program with this version or even the 1.8.1 RC to see whether 
> the problem still occurs. But it could also be caused by using fine grained 
> recovery. So it might be worth a try to disable this feature if you turned it 
> on.
> 
> Thanks a lot!
> 
> Cheers,
> Till
> 
> On Thu, Jun 27, 2019 at 8:30 AM Biao Liu  > wrote:
> Hi Ken again,
> 
> In regard to TimeoutException, I just realized that there is no 
> akka.remote.OversizedPayloadException in your log file. There might be some 
> other reason caused this.
> 1. Have you ever tried increasing the configuration "akka.ask.timeout"? 
> 2. Have you ever checked the garbage collection of JM/TM? Maybe you need to 
> enable printing GC log first.
> 
> 
> Biao Liu mailto:mmyy1...@gmail.com>> 于2019年6月27日周四 
> 上午11:38写道:
> Hi Ken,
> 
> In regard to oversized input splits, it seems to be a rare case beyond my 
> expectation. However it should be fixed definitely since input split can be 
> user-defined. We should not assume it must be small. 
> I agree with Stephan that maybe there is something unexpectedly involved in 
> the input splits.
> And there is also a work-around way to solve this before we fixing it, 
> increasing the limit of RPC message size by explicitly configuring 
> "akka.framesize" in flink-conf.yaml.
> 
> To @Qi, also sorry to hear your bad experience. I'll take this issue but I'm 
> not sure I could catch up the releasing of 1.9. Hope things go well.
> 
> 
> Stephan Ewen mailto:se...@apache.org>> 于2019年6月26日周三 
> 下午10:50写道:
> Hi Ken!
> 
> Sorry to hear you are going through this experience. The major focus on 
> streaming so far means that the DataSet API has stability issues at scale.
> So, yes, batch mode in current Flink version can be somewhat tricky.
> 
> It is a big focus of Flink 1.9 to fix the batch mode, finally, and by 
> addressing batch specific scheduling / recovery / and shuffle issues.
> 
> Let me go through the issues you found:
> 
> (1) Input splits and oversized RPC
> 
> Your explanation seems correct, timeout due to dropping oversized RPC message.
> 
> I don't quite understand how that exactly happens, because the size limit is 
> 10 MB and input splits should be rather small in most cases.
> Are you running custom sources which put large data into splits? Maybe 
> accidentally, by having a large serialized closure in the splits?
> 
> The fix would be this issue: https://issues.apache.org/jira/browse/FLINK-4399 
>   
> 
> (2) TM early release
> 
> The 1.8 version had a fix that should work for regular cases without 
> fine-grained failure recovery.
> 1.9 should have a more general fix that also works for fine-grained recovery
> 
> Are you trying to use the finer grained failover with the batch job?
> The finer-grained failover is not working in batc

File Naming Pattern from HadoopOutputFormat

2019-07-01 Thread Hailu, Andreas
Hello Flink team,

I'm writing Avro and Parquet files to HDFS, and I've would like to include a 
UUID as a part of the file name.

Our files in HDFS currently follow this pattern:

tmp-r-1.snappy.parquet
tmp-r-2.snappy.parquet
...

I'm using a custom output format which extends a RichOutputFormat - is this 
something which is natively supported? If so, could you please recommend how 
this could be done, or share the relevant document?

Best,
Andreas



Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices


How does Flink recovers uncommited Kafka offset in AsyncIO?

2019-07-01 Thread wang xuchen
Hi Flink experts,

I am prototyping a real time system that reads from Kafka source with Flink
and calls out to an external system as part of the event processing. One of
the most important requirements are read from Kafka should NEVER stall,
even in face of some async external calls slowness while holding certain
some kafka offsets. At least once processing is good enough.

Currently, I am using AsyncIO with a thread pool of size 20. My
understanding is if I use orderedwait with a large 'capacity', consumption
from Kafka should continue even if some external calls experience slowness
(holding the offsets) as long as the capacity is not exhausted.

(From my own reading of Flink source code, the capacity of the orderedwait
function translate to the size of the OrderedStreamElementQueue size.)

However, I expect that while the external calls stuck, stream source should
keep pumping out from Kafka as long as there is still capacity, but offset
after the stuck record should NOT be committed back to Kafka and (the
checkpoint should also stall to accomodate the stalled offests?)

My observation is, if I set the capacity large enough (max_int / 100 for
instance), the consumption was not stalled (which is good), but the offsets
were all committed back to Kafka AFTER the stalled records and all
checkpoint succeeded, no back pressure was incurred.

In this case, if some machines crash, how does Flink recover the stalled
offsets? Which checkpoint does Flink rollback to?  I understand that
commiting offset back to Kafka is merely to show progress to external
monitoring tool, but I hope Flink does book keeping somewhere to journal
async call xyz is not return and should be retried during recovery.

==

I`ve done a some more experiments, looks like Flink is able to recover the
record which I threw completeExceptionly even if I use 'unorderedwait' on
the async stream.

Which leads to Fabian`s early comments, 'Flink does not rely on Kafka
consumer offset to recover, committing offset to Kafka is merely to show
progress to external monitoring tools'.

I couldn`t pinpoint the code that Flink uses the achieve it, maybe
in-flight async invokations in 'unorderedstreamelementqueue' are part of
the checkpoint and Flink saves the actual payload for later replay?

Can anyone cast some lights?


[no subject]

2019-07-01 Thread wang xuchen
Hi Flink experts,

I am prototyping a real time system that reads from Kafka source with Flink
and calls out to an external system as part of the event processing. One of
the most important requirements are read from Kafka should NEVER stall,
even in face of some async external calls slowness while holding certain
some kafka offsets. At least once processing is good enough.

Currently, I am using AsyncIO with a thread pool of size 20. My
understanding is if I use orderedwait with a large 'capacity', consumption
from Kafka should continue even if some external calls experience slowness
(holding the offsets) as long as the capacity is not exhausted.

(From my own reading of Flink source code, the capacity of the orderedwait
function translate to the size of the OrderedStreamElementQueue size.)

However, I expect that while the external calls stuck, stream source should
keep pumping out from Kafka as long as there is still capacity, but offset
after the stuck record should NOT be committed back to Kafka and (the
checkpoint should also stall to accomodate the stalled offests?)

My observation is, if I set the capacity large enough (max_int / 100 for
instance), the consumption was not stalled (which is good), but the offsets
were all committed back to Kafka AFTER the stalled records and all
checkpoint succeeded, no back pressure was incurred.

In this case, if some machines crash, how does Flink recover the stalled
offsets? Which checkpoint does Flink rollback to?  I understand that
commiting offset back to Kafka is merely to show progress to external
monitoring tool, but I hope Flink does book keeping somewhere to journal
async call xyz is not return and should be retried during recovery.

==

I`ve done a some more experiments, looks like Flink is able to recover the
record which I threw completeExceptionly even if I use 'unorderedwait' on
the async stream.

Which leads to Fabian`s early comments, 'Flink does not rely on Kafka
consumer offset to recover, committing offset to Kafka is merely to show
progress to external monitoring tools'.

I couldn`t pinpoint the code that Flink uses the achieve it, maybe
in-flight async invokations in 'unorderedstreamelementqueue' are part of
the checkpoint and Flink saves the actual payload for later replay?

Can anyone cast some lights?


Re: [FLINK-10868] job cannot be exited immediately if job manager is timed out for some reason

2019-07-01 Thread Peter Huang
Hi Anyang,

Thanks for rising the question. I didn't test the PR in batch mode, the
observation helps me to have better implementation. From my understanding,
if rm to a job manager heartbeat timeout, the job manager connection will
be closed, so it will not be reconnected. Are you running batch job in per
job cluster or session cluster? To temporarily mitigate the issue you are
facing, you probable can tune the heartbeat.timecout (default 50s) to a
larger value.


Best Regards
Peter Huang

On Mon, Jul 1, 2019 at 7:50 AM Till Rohrmann  wrote:

> Hi Anyang,
>
> as far as I can tell, FLINK-10868 has not been merged into Flink yet.
> Thus, I cannot tell much about how well it works. The case you are
> describing should be properly handled in a version which get's merged
> though. I guess what needs to happen is that once the JM reconnects to the
> RM it should synchronize the pending slot requests with the registered slot
> requests on the RM. But this should be a follow up issue to FLINK-10868,
> because it would widen the scope too much.
>
> Cheers,
> Till
>
> On Wed, Jun 26, 2019 at 10:52 AM Anyang Hu  wrote:
>
>> Hi ZhenQiu && Rohrmann:
>>
>> Currently I backport the FLINK-10868 to flink-1.5, most of my jobs (all
>> batch jobs) can be exited immediately after applying for the failed
>> container to the upper limit, but there are still some jobs cannot be
>> exited immediately. Through the log, it is observed that these jobs have
>> the job manager timed out first for unknown reasons. The execution of code
>> segment 1 is after the job manager timed out but before the job manager is
>> reconnected, so it is suspected that the job manager is out of
>> synchronization and notifyAllocationFailure() method in the code segment 2
>> is not executed.
>>
>>
>> I'm wandering if you have encountered similar problems and is there a
>> solution? In order to solve the problem that cannot be immediately quit, it
>> is currently considered that if (jobManagerRegistration==null) then
>> executes the onFatalError() method to immediately exit the process, it is
>> temporarily unclear whether this violent practice will have any side
>> effects.
>>
>>
>> Thanks,
>> Anyang
>>
>>
>> code segment 1 in ResourceManager.java:
>>
>> private void cancelAllPendingSlotRequests(Exception cause) {
>>slotManager.cancelAllPendingSlotRequests(cause);
>> }
>>
>>
>> code segment 2 in ResourceManager.java:
>>
>> @Override
>> public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, 
>> Exception cause) {
>>validateRunsInMainThread();
>>log.info("Slot request with allocation id {} for job {} failed.", 
>> allocationId, jobId, cause);
>>
>>JobManagerRegistration jobManagerRegistration = 
>> jobManagerRegistrations.get(jobId);
>>if (jobManagerRegistration != null) {
>>   
>> jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId,
>>  cause);
>>}
>> }
>>
>>


Re: [FLINK-10868] job cannot be exited immediately if job manager is timed out for some reason

2019-07-01 Thread Till Rohrmann
Hi Anyang,

as far as I can tell, FLINK-10868 has not been merged into Flink yet. Thus,
I cannot tell much about how well it works. The case you are describing
should be properly handled in a version which get's merged though. I guess
what needs to happen is that once the JM reconnects to the RM it should
synchronize the pending slot requests with the registered slot requests on
the RM. But this should be a follow up issue to FLINK-10868, because it
would widen the scope too much.

Cheers,
Till

On Wed, Jun 26, 2019 at 10:52 AM Anyang Hu  wrote:

> Hi ZhenQiu && Rohrmann:
>
> Currently I backport the FLINK-10868 to flink-1.5, most of my jobs (all
> batch jobs) can be exited immediately after applying for the failed
> container to the upper limit, but there are still some jobs cannot be
> exited immediately. Through the log, it is observed that these jobs have
> the job manager timed out first for unknown reasons. The execution of code
> segment 1 is after the job manager timed out but before the job manager is
> reconnected, so it is suspected that the job manager is out of
> synchronization and notifyAllocationFailure() method in the code segment 2
> is not executed.
>
>
> I'm wandering if you have encountered similar problems and is there a
> solution? In order to solve the problem that cannot be immediately quit, it
> is currently considered that if (jobManagerRegistration==null) then
> executes the onFatalError() method to immediately exit the process, it is
> temporarily unclear whether this violent practice will have any side
> effects.
>
>
> Thanks,
> Anyang
>
>
> code segment 1 in ResourceManager.java:
>
> private void cancelAllPendingSlotRequests(Exception cause) {
>slotManager.cancelAllPendingSlotRequests(cause);
> }
>
>
> code segment 2 in ResourceManager.java:
>
> @Override
> public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, 
> Exception cause) {
>validateRunsInMainThread();
>log.info("Slot request with allocation id {} for job {} failed.", 
> allocationId, jobId, cause);
>
>JobManagerRegistration jobManagerRegistration = 
> jobManagerRegistrations.get(jobId);
>if (jobManagerRegistration != null) {
>   
> jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId,
>  cause);
>}
> }
>
>


Re: [FLINK-10868] the job cannot be exited immediately if job manager is timed out

2019-07-01 Thread Till Rohrmann
Hi Young,

as far as I can tell, FLINK-10868 has not been merged into Flink yet. Thus,
I cannot tell much about how well it works. The case you are describing
should be properly handled in a version which get's merged though. I guess
what needs to happen is that once the JM reconnects to the RM it should
synchronize the pending slot requests with the registered slot requests on
the RM. But this should be a follow up issue to FLINK-10868, because it
would widen the scope too much.

Cheers,
Till

On Fri, Jun 21, 2019 at 2:43 PM Young  wrote:

> Hi ZhenQiu & trohrmann:
>
> Currently I backport the  FLINK-10868 to flink-1.5. Most of my jobs(all
> batch jobs) can be exited immediately after applying for the failed
> container to the upper limit, but there are still some jobs cannot be
> exited immediately. Through the log, it is observed that these jobs have
> the job manager timed out first  for unknown reasons, and the execution of
> code segment 1 is after the job manager timed out but before the job
> manager is reconnected, so it is suspected that the job manager is out of
> synchronization and the code segment 2 is in the code segment
> notifyAllocationFailure function is not executed .
>
> I'm wandering if you have encountered similar problems? Is there a
> solution? In order to solve the problem that job cannot be immediately
> exited, it is currently considered that if (jobManagerRegistration==null)
> then executes the onFatalError() method to immediately exit the process, it
> is temporarily unclear whether this violent practice will have any side
> effects.
>
> Thanks,
> Young
>
> code segment 1  in ResourceManager.java:
>
> private void cancelAllPendingSlotRequests(Exception cause) {
>slotManager.cancelAllPendingSlotRequests(cause);
> }
>
>
> code segment 2  in ResourceManager.java:
>
> public void notifyAllocationFailure(JobID jobId, AllocationID allocationId, 
> Exception cause) {
>validateRunsInMainThread();
>log.info("Slot request with allocation id {} for job {} failed.", 
> allocationId, jobId, cause);
>
>JobManagerRegistration jobManagerRegistration = 
> jobManagerRegistrations.get(jobId);
>if (jobManagerRegistration != null) {
>   
> jobManagerRegistration.getJobManagerGateway().notifyAllocationFailure(allocationId,
>  cause);
>}
> }
>
>
>
>
>
>
>
>


Re: Batch mode with Flink 1.8 unstable?

2019-07-01 Thread Till Rohrmann
Quick addition for problem (1): The AkkaRpcActor should serialize the
response if it is a remote RPC and send an AkkaRpcException if the
response's size exceeds the maximum frame size. This should be visible on
the call site since the future should be completed with this exception. I'm
wondering why you don't see this exception.

It could be helpful to better understand the input splits your program is
generating. Is it simply a `FileInputSplit` or did you implement a custom
InputSplitAssigner with custom InputSplits?

Cheers,
Till

On Mon, Jul 1, 2019 at 11:57 AM Till Rohrmann  wrote:

> Hi Ken,
>
> in order to further debug your problems it would be helpful if you could
> share the log files on DEBUG level with us.
>
> For problem (2), I suspect that it has been caused by Flink releasing TMs
> too early. This should be fixed with FLINK-10941 which is part of Flink
> 1.8.1. The 1.8.1 release should be released very soonish. It would be great
> if you could try your program with this version or even the 1.8.1 RC to see
> whether the problem still occurs. But it could also be caused by using fine
> grained recovery. So it might be worth a try to disable this feature if you
> turned it on.
>
> Thanks a lot!
>
> Cheers,
> Till
>
> On Thu, Jun 27, 2019 at 8:30 AM Biao Liu  wrote:
>
>> Hi Ken again,
>>
>> In regard to TimeoutException, I just realized that there is no
>> akka.remote.OversizedPayloadException in your log file. There might be some
>> other reason caused this.
>> 1. Have you ever tried increasing the configuration "akka.ask.timeout"?
>> 2. Have you ever checked the garbage collection of JM/TM? Maybe you need
>> to enable printing GC log first.
>>
>>
>> Biao Liu  于2019年6月27日周四 上午11:38写道:
>>
>>> Hi Ken,
>>>
>>> In regard to oversized input splits, it seems to be a rare case beyond
>>> my expectation. However it should be fixed definitely since input split can
>>> be user-defined. We should not assume it must be small.
>>> I agree with Stephan that maybe there is something unexpectedly involved
>>> in the input splits.
>>> And there is also a work-around way to solve this before we fixing it,
>>> increasing the limit of RPC message size by explicitly configuring
>>> "akka.framesize" in flink-conf.yaml.
>>>
>>> To @Qi, also sorry to hear your bad experience. I'll take this issue but
>>> I'm not sure I could catch up the releasing of 1.9. Hope things go well.
>>>
>>>
>>> Stephan Ewen  于2019年6月26日周三 下午10:50写道:
>>>
 Hi Ken!

 Sorry to hear you are going through this experience. The major focus on
 streaming so far means that the DataSet API has stability issues at scale.
 So, yes, batch mode in current Flink version can be somewhat tricky.

 It is a big focus of Flink 1.9 to fix the batch mode, finally, and by
 addressing batch specific scheduling / recovery / and shuffle issues.

 Let me go through the issues you found:

 *(1) Input splits and oversized RPC*

 Your explanation seems correct, timeout due to dropping oversized RPC
 message.

 I don't quite understand how that exactly happens, because the size
 limit is 10 MB and input splits should be rather small in most cases.
 Are you running custom sources which put large data into splits? Maybe
 accidentally, by having a large serialized closure in the splits?

 The fix would be this issue:
 https://issues.apache.org/jira/browse/FLINK-4399

 *(2) TM early release*

 The 1.8 version had a fix that should work for regular cases without
 fine-grained failure recovery.
 1.9 should have a more general fix that also works for fine-grained
 recovery

 Are you trying to use the finer grained failover with the batch job?
 The finer-grained failover is not working in batch for 1.8, that is why
 it is not an advertised feature (it only works for streaming so far).

 The goal is that this works in the 1.9 release (aka the batch fixup
 release)

 (3) Hang in Processing

 I think a thread dump (jstack) from the TMs would be helpful to
 diagnose that.
 There are known issues with the current batch shuffle implementation,
 which is why 1.9 is getting a new bounded-blocking stream shuffle
 implementation.

 Best,
 Stephan






 On Mon, Jun 24, 2019 at 2:32 AM Ken Krugler <
 kkrugler_li...@transpac.com> wrote:

> Hi all,
>
> I’ve been running a somewhat complex batch job (in EMR/YARN) with
> Flink 1.8.0, and it regularly fails, but for varying reasons.
>
> Has anyone else had stability with 1.8.0 in batch mode and non-trivial
> workflows?
>
> Thanks,
>
> — Ken
>
> *1. TimeoutException getting input splits*
>
> The batch job starts by processing a lot of files that live in S3.
> During this phase, I sometimes see:
>
> 2019-06-20 01:20:22,659 INFO
>  org.apache.flink.run

Re: LookupableTableSource question

2019-07-01 Thread Flavio Pompermaier
I probably messed up with the meaning of eval()..thus it is called once for
every distinct key (that could be composed by a combination of fields)?
So, the other question is..how do I enable Blink planner support?
Since when is LATERAL TABLE available in Flink? Is it equivalent to using
temporal tables [1]?

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/temporal_tables.html

Best,
Flavio

On Sat, Jun 29, 2019 at 3:16 AM JingsongLee  wrote:

> The keys means joint primary keys, it is not list of keys, in your case,
> maybe there is a single key?
>
> Best, Jingsong Lee
>
>
> 来自阿里邮箱 iPhone版
> 
>
> --Original Mail --
> *From:*Flavio Pompermaier 
> *Date:*2019-06-28 22:53:31
> *Recipient:*JingsongLee 
> *CC:*user 
> *Subject:*Re: LookupableTableSource question
> Sorry I copied and pasted twice the current eval method...I'd do this:
>
> public void eval(Object... keys) {
> for (Object kkk : keys) {
> Row keyRow = Row.of(kkk);
> if (cache != null) {
> List cachedRows = cache.getIfPresent(keyRow);
> if (cachedRows != null) {
> for (Row cachedRow : cachedRows) {
> collect(cachedRow);
> }
> return;
> }
> }
> }
>  ...
>
> On Fri, Jun 28, 2019 at 4:51 PM Flavio Pompermaier 
> wrote:
>
>> This could be a good fit, I'll try to dig into it and see if it can be
>> adapted to a REST service.
>> The only strange thing I see is that the key of the local cache is per
>> block of keys..am I wrong?
>> Shouldn't it cycle over the list of passed keys?
>>
>> Right now it's the following:
>>
>> Cache> cache;
>>
>> public void eval(Object... keys) {
>> Row keyRow = Row.of(keys);
>> if (cache != null) {
>> List cachedRows = cache.getIfPresent(keyRow);
>> if (cachedRows != null) {
>> for (Row cachedRow : cachedRows) {
>> collect(cachedRow);
>> }
>> return;
>> }
>> }
>>  ...
>>
>> while I'd use the following (also for JDBC):
>>
>> Cache> cache;
>>
>> public void eval(Object... keys) {
>> Row keyRow = Row.of(keys);
>> if (cache != null) {
>> List cachedRows = cache.getIfPresent(keyRow);
>> if (cachedRows != null) {
>> for (Row cachedRow : cachedRows) {
>> collect(cachedRow);
>> }
>> return;
>> }
>> }
>>  ...
>>
>> public void eval(Object... keys) {
>> for (Object kkk : keys) {
>> Row keyRow = Row.of(kkk);
>> if (cache != null) {
>> List cachedRows = cache.getIfPresent(keyRow);
>> if (cachedRows != null) {
>> for (Row cachedRow : cachedRows) {
>> collect(cachedRow);
>> }
>> return;
>> }
>> }
>> }
>>  ...
>>
>> Am I missing something?
>>
>>
>> On Fri, Jun 28, 2019 at 4:18 PM JingsongLee 
>> wrote:
>>
>>> Hi Flavio:
>>>
>>> I just implement a JDBCLookupFunction[1]. You can use it as table
>>> function[2]. Or use
>>> blink temporal table join[3] (Need blink planner support).
>>> I add a google guava cache in JDBCLookupFunction with configurable
>>> cacheMaxSize
>>> (avoid memory OOM) and cacheExpireMs(For the fresh of lookup table).
>>> Is that you want?
>>>
>>> [1]
>>> https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunction.java
>>> [2]
>>> https://github.com/JingsongLi/flink/blob/cc80999279b38627b37fa7550fb6610eee450d86/flink-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCLookupFunctionITCase.java#L143
>>> [3]
>>> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala#L75
>>>
>>>  Best, JingsongLee
>>>
>>> --
>>> From:Flavio Pompermaier 
>>> Send Time:2019年6月28日(星期五) 21:04
>>> To:user 
>>> Subject:LookupableTableSource question
>>>
>>> Hi to all,
>>> I have a use case where I'd like to enrich a stream using a rarely
>>> updated lookup table.
>>> Basically, I'd like to be able to set a refresh policy that is triggered
>>> either when a key was not found (a new key has probably been added in the
>>> mean time) or a configurable refresh-period has elapsed.
>>>
>>> Is there any suggested solution to this? The LookupableTableSource looks
>>> very similar to what I'd like to achieve but I can't find a real-world
>>> example using it and it lacks of such 2 requirements (key-values are not
>>> refreshed after a configurable timeout and a KeyNotFound callback cannot be
>>> handled).
>>>
>>> Any help is appreciated,
>>> Flavio
>>>
>>>
>>>
>>
>


Re: Apache Flink - Running application with different flink configurations (flink-conf.yml) on EMR

2019-07-01 Thread M Singh
 Thanks Jeff and Xintong for your pointers.
On Friday, June 28, 2019, 10:44:35 AM EDT, Jeff Zhang  
wrote:  
 
 
This is due to flink doesn't unify the execution in different enviroments. The 
community has discuss it before about how to enhance the flink client api. The 
initial proposal is to introduce FlinkConf which contains all the configuration 
so that we can unify the executions in all environments (IDE, CLI, SQL Client, 
Scala Shell, downstream project)
Here's the sample code:

val conf = new FlinkConf().setProperty(“key_1”, “value_1”)     // create 
FlinkConf

val env = new ExecutionEnvironment(conf)   // create ExecutionEnvironment

val jobId = env.submit(...)           // non-blocking job submission (detached 
mode)

val jobStatus = env.getClusterClient().queryJobStatus(jobId)   // approach 1: 
query job status via ClusterClient

 val jobStatus = env.queryJobStatus(jobId)   // approach 2: query job status 
via ExecutionEnvironment.


And you can refer this for more details:
https://docs.google.com/document/d/1VavBrYn8vJeZs-Mhu5VzKO6xrWCF40aY0nlQ_UVVTRg/edit?usp=sharing




Xintong Song  于2019年6月28日周五 下午10:28写道:

Hi, Singh,
I don't think that should work. The -D or -yD parameters needs to be passed to 
the Flink start-up scripts or the "flink run" command. I don't think the 
IntelliJ VM arguments are equivalent to that. In fact, I'm not aware of any 
method to set "-D" parameters when running jobs IDE.

Thank you~

Xintong Song




On Fri, Jun 28, 2019 at 8:45 PM M Singh  wrote:

 Hi Xintong:
I passed the -Dparallelism.default=2 in the  run configuration VM arguments for 
IntelliJ.
So what I am looking for is a way to overwrite the config parameters which are 
defined in the flink-config.yaml file (parallelism.default is just an example) 
which would be picked up regardless of the env (eg: locally, on yarn or IDE).  
When I run the application in IDE (locally) with the above mentioned VM 
parameter, the StreamExecutionEnvironment.config does not show this value and 
the Flink UI shows configuration parameter parallelism as 8.  Is there any 
other place where I can see the parameter settings ?
Thanks.
On Thursday, June 27, 2019, 11:58:28 PM EDT, Xintong Song 
 wrote:  
 
 Could you provide some more details on how you run your job with -D options in 
IDE?




Thank you~

Xintong Song




On Fri, Jun 28, 2019 at 5:03 AM M Singh  wrote:

 Hi Xintong:  Thanks for your pointers.
I tried -Dparallelism.default=2 locally (in IDE) and it did not work.  Do you 
know if there is a common way that would work both for emr, locally and ide ?
Thanks again.
On Thursday, June 27, 2019, 12:03:06 AM EDT, Xintong Song 
 wrote:  
 
 Hi Singh,
You can use the environment variable "FLINK_CONF_DIR" to specify path to the 
directory of config files. You can also override config options with command 
line arguments prefixed -D (for yarn-session.sh) or -yD (for 'flink run' 
command).

Thank you~

Xintong Song




On Wed, Jun 26, 2019 at 9:13 PM M Singh  wrote:

Hi:
I have a single EMR cluster with Flink and want to run multiple applications on 
it with different flink configurations.  Is there a way to 
1. Pass the config file name for each application, or2. Overwrite the config 
parameters via command line arguments for the application.  This is similar to 
how we can overwrite the default parameters in spark
I searched the documents and have tried using ParameterTool with the config 
parameter names, but it has not worked as yet.
Thanks for your help.
Mans
  
  



-- 
Best Regards

Jeff Zhang  

Apache Flink - Multiple Kinesis stream consumers

2019-07-01 Thread M Singh
Hi:
I am trying to understand how does Flink coordinate multiple kinesis consumers 
and am trying to find out:
1. Is it possible to read same kinesis stream independently multiple times 
within a single application ? How does Flink coordinate consuming same kinesis 
multiple times in a single application ? Are there any issues that can arise 
from this pattern ?2. How does Flink coordinate consumers of same kinesis 
stream across multiple applications ? 
Thanks
Mans

Re: Batch mode with Flink 1.8 unstable?

2019-07-01 Thread Till Rohrmann
Hi Ken,

in order to further debug your problems it would be helpful if you could
share the log files on DEBUG level with us.

For problem (2), I suspect that it has been caused by Flink releasing TMs
too early. This should be fixed with FLINK-10941 which is part of Flink
1.8.1. The 1.8.1 release should be released very soonish. It would be great
if you could try your program with this version or even the 1.8.1 RC to see
whether the problem still occurs. But it could also be caused by using fine
grained recovery. So it might be worth a try to disable this feature if you
turned it on.

Thanks a lot!

Cheers,
Till

On Thu, Jun 27, 2019 at 8:30 AM Biao Liu  wrote:

> Hi Ken again,
>
> In regard to TimeoutException, I just realized that there is no
> akka.remote.OversizedPayloadException in your log file. There might be some
> other reason caused this.
> 1. Have you ever tried increasing the configuration "akka.ask.timeout"?
> 2. Have you ever checked the garbage collection of JM/TM? Maybe you need
> to enable printing GC log first.
>
>
> Biao Liu  于2019年6月27日周四 上午11:38写道:
>
>> Hi Ken,
>>
>> In regard to oversized input splits, it seems to be a rare case beyond my
>> expectation. However it should be fixed definitely since input split can be
>> user-defined. We should not assume it must be small.
>> I agree with Stephan that maybe there is something unexpectedly involved
>> in the input splits.
>> And there is also a work-around way to solve this before we fixing it,
>> increasing the limit of RPC message size by explicitly configuring
>> "akka.framesize" in flink-conf.yaml.
>>
>> To @Qi, also sorry to hear your bad experience. I'll take this issue but
>> I'm not sure I could catch up the releasing of 1.9. Hope things go well.
>>
>>
>> Stephan Ewen  于2019年6月26日周三 下午10:50写道:
>>
>>> Hi Ken!
>>>
>>> Sorry to hear you are going through this experience. The major focus on
>>> streaming so far means that the DataSet API has stability issues at scale.
>>> So, yes, batch mode in current Flink version can be somewhat tricky.
>>>
>>> It is a big focus of Flink 1.9 to fix the batch mode, finally, and by
>>> addressing batch specific scheduling / recovery / and shuffle issues.
>>>
>>> Let me go through the issues you found:
>>>
>>> *(1) Input splits and oversized RPC*
>>>
>>> Your explanation seems correct, timeout due to dropping oversized RPC
>>> message.
>>>
>>> I don't quite understand how that exactly happens, because the size
>>> limit is 10 MB and input splits should be rather small in most cases.
>>> Are you running custom sources which put large data into splits? Maybe
>>> accidentally, by having a large serialized closure in the splits?
>>>
>>> The fix would be this issue:
>>> https://issues.apache.org/jira/browse/FLINK-4399
>>>
>>> *(2) TM early release*
>>>
>>> The 1.8 version had a fix that should work for regular cases without
>>> fine-grained failure recovery.
>>> 1.9 should have a more general fix that also works for fine-grained
>>> recovery
>>>
>>> Are you trying to use the finer grained failover with the batch job?
>>> The finer-grained failover is not working in batch for 1.8, that is why
>>> it is not an advertised feature (it only works for streaming so far).
>>>
>>> The goal is that this works in the 1.9 release (aka the batch fixup
>>> release)
>>>
>>> (3) Hang in Processing
>>>
>>> I think a thread dump (jstack) from the TMs would be helpful to diagnose
>>> that.
>>> There are known issues with the current batch shuffle implementation,
>>> which is why 1.9 is getting a new bounded-blocking stream shuffle
>>> implementation.
>>>
>>> Best,
>>> Stephan
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Jun 24, 2019 at 2:32 AM Ken Krugler 
>>> wrote:
>>>
 Hi all,

 I’ve been running a somewhat complex batch job (in EMR/YARN) with Flink
 1.8.0, and it regularly fails, but for varying reasons.

 Has anyone else had stability with 1.8.0 in batch mode and non-trivial
 workflows?

 Thanks,

 — Ken

 *1. TimeoutException getting input splits*

 The batch job starts by processing a lot of files that live in S3.
 During this phase, I sometimes see:

 2019-06-20 01:20:22,659 INFO
  org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN
 DataSource (at createInput(ExecutionEnvironment.java:549)
 (org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat)) -> Map (ad
 dailies) -> Filter (Filter at
 createWorkflow(AdvertiserSimilarityWorkflow.java:34)) -> Filter (Filter at
 createWorkflow(AdvertiserSimilarityWorkflow.java:36)) -> Filter (Filter at
 createWorkflow(AdvertiserSimilarityWorkflow.java:38)) -> Map (Key
 Extractor) -> Combine (Reduce at
 createWorkflow(AdvertiserSimilarityWorkflow.java:41)) (31/32)
 (8a8cbea47394c3d638910c36ac62d877) switched from RUNNING to FAILED.
 java.lang.RuntimeException: Could not retrieve next input split.
 at
 org.apache.flink.runtime.operators.DataSour

Fw:Re:Re: About "Flink 1.7.0 HA based on zookeepers "

2019-07-01 Thread Alex.Hu
Hi,All:

   I found some problems about on kubernates flink of 1.6.0 mentioned by Till 
in "HA for 1.6.0 job cluster with docker-compose" in the email list, but I 
found that Jira of flink-10291 in the email has been shut down in 1.7.0, and I 
also found similar errors in on kubernates flink of 1.7.2 at present. Could you 
please help me check the Settings where I have problems? Here are my Settings:

web.log.path: /var/log/flink/flinkweb.log 
taskmanager.log.pth: /var/log/flink/taskmanager/task.log 


jobmanager.rpc.address: tdh2
jobmanager.rpc.port: 16223
jobstore.cache-size: 5368709120
jobstore.expiration-time: 864000
jobmanager.heap.size: 4096m


taskmanager.heap.size:  6000m
taskmanager.numberOfTaskSlots: 6
parallelism.default: 2


high-availability: zookeeper
high-availability.storageDir: hdfs:///flink1/ha/
high-availability.zookeeper.quorum: tdh2:2181,tdh4:2181,tdh3:2181
high-availability.zookeeper.path.root: /flink
high-availability.zookeeper.client.acl: open
high-availability.jobmanager.port: 62236-62239


rest.port: 18801
io.tmp.dirs: /data/disk1:/data/disk2:/data/disk3:/data/disk4:/data/disk5


security.kerberos.login.use-ticket-cache: true
security.kerberos.login.contexts: Client
security.kerberos.login.keytab: /etc/flink/conf/hdfs.keytab
security.kerberos.login.principal: hdfs


blob.server.port: 16224
query.server.port: 16225




   And the following is the new error report, the earliest error report in the 
forwarded email message:


apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not 
set: Ignoring message LocalFencedMessage(98b7a69a48c04a9ca01b1eca2b714146, 
LocalRpcInvocation(requestRestAddress(Time))) sent to 
akka.tcp://flink@tdh2:62236/user/dispatcher because the fencing token is null.
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:59)
... 14 common frames omitted
2019-07-01 17:10:40.159 [flink-rest-server-netty-worker-thread-39] ERROR 
o.a.f.r.rest.handler.legacy.files.StaticFileServerHandler  - Could not retrieve 
the redirect address.
java.util.concurrent.CompletionException: 
org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token 
not set: Ignoring message LocalFencedMessage(98b7a69a48c04a9ca01b1eca2b714146, 
LocalRpcInvocation(requestRestAddress(Time))) sent to 
akka.tcp://flink@tdh2:62236/user/dispatcher because the fencing token is null.
at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
at akka.actor.ActorRef.tell(ActorRef.scala:130)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendErrorIfSender(AkkaRpcActor.java:371)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:57)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: 
Fencing token not set: Ignoring message 
LocalFencedMessage(98b7a69a48c04a9ca01b1eca2b71

Re:Re: Re:Flink batch job memory/disk leak when invoking set method on a static Configuration object.

2019-07-01 Thread Haibo Sun
Hi, Vadim 
 
I tried many times with the master branch code and failed to reproduce this 
issue. Which version of Flink did you use?


For the Configuration class in your code, I use `org. apache. hadoop. conf. 
Configuration`.


The configurations I enabled in flink-conf.yaml are as follows (except that, no 
other changes have been made):
high-availability: zookeeper
high-availability.storageDir: file:///data/tmp/flink/ha/
high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.path.root: /flink-test
blob.storage.directory: /data/tmp/flink/blob


Best,
Haibo


At 2019-06-28 15:49:50, "Vadim Vararu"  wrote:

Hi,


I've run it on a standalone Flink cluster. No Yarn involved.
From: Haibo Sun 
Sent: Friday, June 28, 2019 6:13 AM
To: Vadim Vararu
Cc: user@flink.apache.org
Subject: Re:Flink batch job memory/disk leak when invoking set method on a 
static Configuration object.
 
Hi, Vadim


This similar issue has occurred in earlier versions, see 
https://issues.apache.org/jira/browse/FLINK-4485.
Are you running a Flink session cluster on YARN? I think it might be a bug. 
I'll see if I can reproduce it with the master branch code, and if yes, I will 
try to investigate it.


If someone already knows the cause of the problem, that's the best,  it won't 
need to be re-investigated.


Best,
Haibo



At 2019-06-28 00:46:43, "Vadim Vararu"  wrote:

Hi guys,


I have a simple batch job with a custom output formatter that writes to a local 
file.


public class JobHadoop {

public static void main(String[] args) throws Exception {
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

env.fromCollection(Sets.newHashSet("line1", "line2", "line3"))
.map(line -> line + "dd")
.write(new HadoopUsageOutputFormat(), "file:///tmp/out");

env.execute(JobHadoop.class.getName());
}

}
public class HadoopUsageOutputFormat extends FileOutputFormat 
implements OutputFormat {

private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new 
Configuration();
public static final String DEFAULT_LINE_DELIMITER = "\n";

private Writer writer;

static {
DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1");
}

@Override
public void open(int taskNumber, int numTasks) throws IOException {
super.open(taskNumber, numTasks);
writer = new OutputStreamWriter(new BufferedOutputStream(stream));
}

@Override
public void writeRecord(String record) throws IOException {
writer.write(record);
writer.write(DEFAULT_LINE_DELIMITER);
}

@Override
public void close() throws IOException {
if (writer != null) {
this.writer.flush();
this.writer.close();
}

super.close();
}
}


The problem is that after the job is finished, there is somewhere a memory leak 
that does not permit the blobStore of the job to be deleted. The number of such 
"deleted" files increases after each job run. Even if they are marked as 
deleted, there is somewhere a reference to the file in the JobManager process 
that keeps it from actual deletion.








Also, the problem reproduces only if I actually invoke the set method of 
Configuration:
static {
DEFAULT_HADOOP_CONFIGURATION.set("just.a.prop", "/var/test1");
}


From my observations, if I change the 
private static final Configuration DEFAULT_HADOOP_CONFIGURATION = new 
Configuration();
to a non-static field, then the problem does no reproduce any more.




However, I'm interested if that's a normal behaviour or a bug/leak somewhere in 
Flink itself.


Thanks, 
Vadim.