Re: Resource changed on src filesystem after upgrade

2021-01-29 Thread Mark Davis
Hi Xintong Song,

> - Does this error happen for every of your dataset jobs? For a problematic 
> job, does it happen for every container?
> - What is the `jobs.jar`? Is it under `lib/`, `opt` of your client side 
> filesystem, or specified as `yarn.ship-files`, `yarn.ship-archives` or 
> `yarn.provided.lib.dirs`? This helps us to locate the code path that this 
> file went through.

I finally found the cause for the problem - I set both yarn.flink-dist-jar and 
pipeline.jars to the same archive (I submit jobs programmatically and repackage 
the Flink distribution because flink-dist.jar is not in the Central).
If I copy the file and refer jobs and distribution jars under different names 
the problem disappears.

My guess is that YARN (YarnApplicationFileUploader?) copies both files and if 
the filenames are the same the first file is overwritten by the second one and 
thus there is a a timestamp difference.

I guess a lot has changed since 1.8 in the YARN deployment area. Too bad there 
is no clear instruction how to submit a job programmatically every time I have 
to reverse engineer CliFrontend.

Sorry for the confusion and thanks!

Mark

Resource changed on src filesystem after upgrade

2021-01-17 Thread Mark Davis
Hi all,
I am upgrading my DataSet jobs from Flink 1.8 to 1.12.
After the upgrade I started to receive the errors like this one:

14:12:57,441 INFO 
org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager - Worker 
container_e120_1608377880203_0751_01_000112 is terminated. Diagnostics: 
Resource 
hdfs://bigdata/user/hadoop/.flink/application_1608377880203_0751/jobs.jar 
changed on src filesystem (expected 1610892446439, was 1610892446971
java.io.IOException: Resourceh 
dfs://bigdata/user/hadoop/.flink/application_1608377880203_0751/jobs.jar 
changed on src filesystem (expected 1610892446439, was 1610892446971
at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:257)
at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361)
at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:359)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:228)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:221)
at 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:209)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

I understand it is somehow related to FLINK-12195, but this time it comes from 
the Hadoop code. I am running a very old version of the HDP platform v.2.6.5 so 
it might be the one to blame.
But the code was working perfectly fine before the upgrade, so I am confused.
Could you please advise.

Thank you!
Mark

Re: Resource leak in DataSourceNode?

2020-08-30 Thread Mark Davis
Hi Robert,

Thank you for confirming that there is an issue.
I do not have a solution for it and would like to hear the committer insights 
what is wrong there.

I think there are actually two issues - the first one is the HBase InputFormat 
does not close a connection in close().
Another is DataSourceNode not calling the close() method.

Cheers,
Mark

‐‐‐ Original Message ‐‐‐
On Thursday, August 27, 2020 3:30 PM, Robert Metzger  
wrote:

> Hi Mark,
>
> Thanks a lot for your message and the good investigation! I believe you've 
> found a bug in Flink. I filed an issue for the problem: 
> https://issues.apache.org/jira/browse/FLINK-19064.
>
> Would you be interested in opening a pull request to fix this?
> Otherwise, I'm sure a committer will pick up the issue soon.
>
> I'm not aware of a simple workaround for the problem.
>
> Best,
> Robert
>
> On Wed, Aug 26, 2020 at 4:05 PM Mark Davis  wrote:
>
>> Hi,
>>
>> I am trying to investigate a problem with non-released resources in my 
>> application.
>>
>> I have a stateful application which submits Flink DataSetjobs using code 
>> very similar to the code in CliFrontend.
>> I noticed what I am getting a lot of non-closed connections to my data store 
>> (HBase in my case). The connections are held by the application not the jobs 
>> themselves.
>>
>> I am using HBaseRowDataInputFormat and it seems that HBase connections 
>> opened in the configure() method during the job graph creation(before the 
>> jobs is executed) are not closed. My search lead me to the method 
>> DataSourceNode.computeOperatorSpecificDefaultEstimates(DataStatistics) where 
>> I see that a format is not closed after being configured.
>>
>> Is that correct? How can I overcome this issue?
>>
>> My application is long running that is probably why I observe the resource 
>> leak. Would I spawn a new JVM to run jobs this problem would not be 
>> noticeable.
>>
>> Thank you!
>>
>> Cheers,
>> Marc

Resource leak in DataSourceNode?

2020-08-26 Thread Mark Davis
Hi,

I am trying to investigate a problem with non-released resources in my 
application.

I have a stateful application which submits Flink DataSetjobs using code very 
similar to the code in CliFrontend.
I noticed what I am getting a lot of non-closed connections to my data store 
(HBase in my case). The connections are held by the application not the jobs 
themselves.

I am using HBaseRowDataInputFormat and it seems that HBase connections opened 
in the configure() method during the job graph creation(before the jobs is 
executed) are not closed. My search lead me to the method 
DataSourceNode.computeOperatorSpecificDefaultEstimates(DataStatistics) where I 
see that a format is not closed after being configured.

Is that correct? How can I overcome this issue?

My application is long running that is probably why I observe the resource 
leak. Would I spawn a new JVM to run jobs this problem would not be noticeable.

Thank you!

Cheers,
Marc

Re: Run command after Batch is finished

2020-06-09 Thread Mark Davis
Hi Chesnay,

That is an interesting proposal, thank you!
I was doing something similar with the OutputFormat#close() method respecting 
the Format's parallelism. Using FinalizeOnMaster will make things easier.
But the problem is that several OutputFormats must be synchronized externally - 
every output must check whether other outputs finished already... Quite 
cumbersome.
Also there is a problem with exceptions - the OutputFormats can be never open 
and never closed.

  Mark

‐‐‐ Original Message ‐‐‐
On Monday, June 8, 2020 5:50 PM, Chesnay Schepler  wrote:

> This goes in the right direction; have a look at 
> org.apache.flink.api.common.io.FinalizeOnMaster, which an OutputFormat can 
> implement to run something on the Master after all subtasks have been closed.
>
> On 08/06/2020 17:25, Andrey Zagrebin wrote:
>
>> Hi Mark,
>>
>> I do not know how you output the results in your pipeline.
>> If you use DataSet#output(OutputFormat outputFormat), you could try to 
>> extend the format with a custom close method which should be called once the 
>> task of the sink batch operator is done in the task manager.
>> I also cc Aljoscha, maybe, he has more ideas.
>>
>> Best,
>> Andrey
>>
>> On Sun, Jun 7, 2020 at 1:35 PM Mark Davis  wrote:
>>
>>> Hi Jeff,
>>>
>>> Unfortunately this is not good enough for me.
>>> My clients are very volatile, they start a batch and can go away any moment 
>>> without waiting for it to finish. Think of an elastic web application or an 
>>> AWS Lambda.
>>>
>>> I hopped to find something what could be deployed to the cluster together 
>>> with the batch code. Maybe a hook to a job manager or similar. I do not 
>>> plan to run anything heavy there, just some formal cleanups.
>>> Is there something like this?
>>>
>>> Thank you!
>>>
>>>   Mark
>>>
>>> ‐‐‐ Original Message ‐‐‐
>>> On Saturday, June 6, 2020 4:29 PM, Jeff Zhang  wrote:
>>>
>>>> It would run in the client side where ExecutionEnvironment is created.
>>>>
>>>> Mark Davis  于2020年6月6日周六 下午8:14写道:
>>>>
>>>>> Hi Jeff,
>>>>>
>>>>> Thank you very much! That is exactly what I need.
>>>>>
>>>>> Where the listener code will run in the cluster deployment(YARN, k8s)?
>>>>> Will it be sent over the network?
>>>>>
>>>>> Thank you!
>>>>>
>>>>>   Mark
>>>>>
>>>>> ‐‐‐ Original Message ‐‐‐
>>>>> On Friday, June 5, 2020 6:13 PM, Jeff Zhang  wrote:
>>>>>
>>>>>> You can try JobListener which you can register to ExecutionEnvironment.
>>>>>>
>>>>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java
>>>>>>
>>>>>> Mark Davis  于2020年6月6日周六 上午12:00写道:
>>>>>>
>>>>>>> Hi there,
>>>>>>>
>>>>>>> I am running a Batch job with several outputs.
>>>>>>> Is there a way to run some code(e.g. release a distributed lock) after 
>>>>>>> all outputs are finished?
>>>>>>>
>>>>>>> Currently I do this in a try-finally block around 
>>>>>>> ExecutionEnvironment.execute() call, but I have to switch to the 
>>>>>>> detached execution mode - in this mode the finally block is never run.
>>>>>>>
>>>>>>> Thank you!
>>>>>>>
>>>>>>>   Mark
>>>>>>
>>>>>> --
>>>>>> Best Regards
>>>>>>
>>>>>> Jeff Zhang
>>>>
>>>> --
>>>> Best Regards
>>>>
>>>> Jeff Zhang

Re: Run command after Batch is finished

2020-06-07 Thread Mark Davis
Hi Jeff,

Unfortunately this is not good enough for me.
My clients are very volatile, they start a batch and can go away any moment 
without waiting for it to finish. Think of an elastic web application or an AWS 
Lambda.

I hopped to find something what could be deployed to the cluster together with 
the batch code. Maybe a hook to a job manager or similar. I do not plan to run 
anything heavy there, just some formal cleanups.
Is there something like this?

Thank you!

  Mark

‐‐‐ Original Message ‐‐‐
On Saturday, June 6, 2020 4:29 PM, Jeff Zhang  wrote:

> It would run in the client side where ExecutionEnvironment is created.
>
> Mark Davis  于2020年6月6日周六 下午8:14写道:
>
>> Hi Jeff,
>>
>> Thank you very much! That is exactly what I need.
>>
>> Where the listener code will run in the cluster deployment(YARN, k8s)?
>> Will it be sent over the network?
>>
>> Thank you!
>>
>>   Mark
>>
>> ‐‐‐ Original Message ‐‐‐
>> On Friday, June 5, 2020 6:13 PM, Jeff Zhang  wrote:
>>
>>> You can try JobListener which you can register to ExecutionEnvironment.
>>>
>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java
>>>
>>> Mark Davis  于2020年6月6日周六 上午12:00写道:
>>>
>>>> Hi there,
>>>>
>>>> I am running a Batch job with several outputs.
>>>> Is there a way to run some code(e.g. release a distributed lock) after all 
>>>> outputs are finished?
>>>>
>>>> Currently I do this in a try-finally block around 
>>>> ExecutionEnvironment.execute() call, but I have to switch to the detached 
>>>> execution mode - in this mode the finally block is never run.
>>>>
>>>> Thank you!
>>>>
>>>>   Mark
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>
> --
> Best Regards
>
> Jeff Zhang

Re: Run command after Batch is finished

2020-06-06 Thread Mark Davis
Hi Jeff,

Thank you very much! That is exactly what I need.

Where the listener code will run in the cluster deployment(YARN, k8s)?
Will it be sent over the network?

Thank you!

  Mark

‐‐‐ Original Message ‐‐‐
On Friday, June 5, 2020 6:13 PM, Jeff Zhang  wrote:

> You can try JobListener which you can register to ExecutionEnvironment.
>
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java
>
> Mark Davis  于2020年6月6日周六 上午12:00写道:
>
>> Hi there,
>>
>> I am running a Batch job with several outputs.
>> Is there a way to run some code(e.g. release a distributed lock) after all 
>> outputs are finished?
>>
>> Currently I do this in a try-finally block around 
>> ExecutionEnvironment.execute() call, but I have to switch to the detached 
>> execution mode - in this mode the finally block is never run.
>>
>> Thank you!
>>
>>   Mark
>
> --
> Best Regards
>
> Jeff Zhang

Run command after Batch is finished

2020-06-05 Thread Mark Davis
Hi there,

I am running a Batch job with several outputs.
Is there a way to run some code(e.g. release a distributed lock) after all 
outputs are finished?

Currently I do this in a try-finally block around 
ExecutionEnvironment.execute() call, but I have to switch to the detached 
execution mode - in this mode the finally block is never run.

Thank you!

  Mark

Re: DataSet API: HBase ScannerTimeoutException and double Result processing

2019-11-25 Thread Mark Davis
Hi Flavio,

>> When the resultScanner dies because of a timeout (this happens a lot when 
>> you have backpressure and the time between 2 consecutive reads exceed the 
>> scanner timeout), the code creates a new scanner and restart from where it 
>> was (starRow = currentRow).
>> So there should not be any duplicates (in theory), but this could be the 
>> root of the problem..

Yes, you are right, the nextRecord() exception handling is responsible for the 
duplicate record processing:

org.apache.hadoop.hbase.client.ScannerTimeoutException: 1038878ms passed since 
the last invocation, timeout is currently set to 6
at 
org.apache.hadoop.hbase.client.ClientScanner.loadCache(ClientScanner.java:453)
at org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:371)
at 
org.apache.flink.addons.hbase.AbstractTableInputFormat.nextRecord(AbstractTableInputFormat.java:130)
at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:192)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.hadoop.hbase.UnknownScannerException: 
org.apache.hadoop.hbase.UnknownScannerException: Name: 135281, already closed?
at 
org.apache.hadoop.hbase.regionserver.RSRpcServices.scan(RSRpcServices.java:2389)
at 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:32385)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2150)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:187)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:167)

But I am not sure that the handling of the HBase exception thrown from 
ClientScanner.next() is correct.
If the call to mapResultToOutType(Result) finished without an error there is no 
need to restart from the same row.
The new scanner should start from the next row.
Is that so or am I missing something?

Best regards,
  Mark

DataSet API: HBase ScannerTimeoutException and double Result processing

2019-11-23 Thread Mark Davis
Hello,

I am reading Results from an HBase table and process them with Batch API. 
Everything works fine until I receive a ScannerTimeoutException from HBase.
Maybe my transformations get stuck or a GC pause happen - hard to tell. The 
HBase Client restarts the scan and the processing continues.
Except one problem - every time I receive this Exception I observe a duplicate 
Result processing - the Result which was processed just before 
ScannerTimeoutException is thrown is processed twice.

Is this expected behavior? Should I be prepared to handle it?
And how should I handle it? Keeping track of all processed Results is not 
feasible in my case.

Here is a simple job demonstrating an issue (HBase scan and RPC timeouts are 
set to 60 sec)

Thank you!

Best regards,
Mark

  public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

env.createInput(new Src())
.map(new Mapper())
.print();
  }

  private static class Mapper implements MapFunction, String> {

private int cnt = 0;

@Override
public String map(Tuple1 value) throws Exception {
  if (cnt++ % 2 == 0) {
Thread.sleep(12);
  }
  return value.f0;
}

  }

  private static class Src extends AbstractTableInputFormat> {

@Override
protected Scan getScanner() {
  Scan scan = new Scan();
  scan.setStartRow(getStartRow());
  scan.setStopRow(getEndRow());
  scan.setCaching(1);
  scan.setCacheBlocks(false);
  return scan;
}

@Override
protected String getTableName() {
  return getTable();
}

@Override
protected Tuple1 mapResultToOutType(Result r) {
  return new Tuple1(Bytes.toString(r.getRow()));
}

@Override
public void configure(org.apache.flink.configuration.Configuration 
parameters) {
  scan = getScanner();
  try {
table = new HTable(getHadoopConf(), getTableName());
  } catch (IOException e) {
e.printStackTrace();
  }
}

  }