Re: Resource changed on src filesystem after upgrade
Hi Xintong Song, > - Does this error happen for every of your dataset jobs? For a problematic > job, does it happen for every container? > - What is the `jobs.jar`? Is it under `lib/`, `opt` of your client side > filesystem, or specified as `yarn.ship-files`, `yarn.ship-archives` or > `yarn.provided.lib.dirs`? This helps us to locate the code path that this > file went through. I finally found the cause for the problem - I set both yarn.flink-dist-jar and pipeline.jars to the same archive (I submit jobs programmatically and repackage the Flink distribution because flink-dist.jar is not in the Central). If I copy the file and refer jobs and distribution jars under different names the problem disappears. My guess is that YARN (YarnApplicationFileUploader?) copies both files and if the filenames are the same the first file is overwritten by the second one and thus there is a a timestamp difference. I guess a lot has changed since 1.8 in the YARN deployment area. Too bad there is no clear instruction how to submit a job programmatically every time I have to reverse engineer CliFrontend. Sorry for the confusion and thanks! Mark
Resource changed on src filesystem after upgrade
Hi all, I am upgrading my DataSet jobs from Flink 1.8 to 1.12. After the upgrade I started to receive the errors like this one: 14:12:57,441 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager - Worker container_e120_1608377880203_0751_01_000112 is terminated. Diagnostics: Resource hdfs://bigdata/user/hadoop/.flink/application_1608377880203_0751/jobs.jar changed on src filesystem (expected 1610892446439, was 1610892446971 java.io.IOException: Resourceh dfs://bigdata/user/hadoop/.flink/application_1608377880203_0751/jobs.jar changed on src filesystem (expected 1610892446439, was 1610892446971 at org.apache.hadoop.yarn.util.FSDownload.copy(FSDownload.java:257) at org.apache.hadoop.yarn.util.FSDownload.access$000(FSDownload.java:63) at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:361) at org.apache.hadoop.yarn.util.FSDownload$2.run(FSDownload.java:359) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869) at org.apache.hadoop.yarn.util.FSDownload.call(FSDownload.java:359) at org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.doDownloadCall(ContainerLocalizer.java:228) at org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:221) at org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer$FSDownloadWrapper.call(ContainerLocalizer.java:209) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) I understand it is somehow related to FLINK-12195, but this time it comes from the Hadoop code. I am running a very old version of the HDP platform v.2.6.5 so it might be the one to blame. But the code was working perfectly fine before the upgrade, so I am confused. Could you please advise. Thank you! Mark
Re: Resource leak in DataSourceNode?
Hi Robert, Thank you for confirming that there is an issue. I do not have a solution for it and would like to hear the committer insights what is wrong there. I think there are actually two issues - the first one is the HBase InputFormat does not close a connection in close(). Another is DataSourceNode not calling the close() method. Cheers, Mark ‐‐‐ Original Message ‐‐‐ On Thursday, August 27, 2020 3:30 PM, Robert Metzger wrote: > Hi Mark, > > Thanks a lot for your message and the good investigation! I believe you've > found a bug in Flink. I filed an issue for the problem: > https://issues.apache.org/jira/browse/FLINK-19064. > > Would you be interested in opening a pull request to fix this? > Otherwise, I'm sure a committer will pick up the issue soon. > > I'm not aware of a simple workaround for the problem. > > Best, > Robert > > On Wed, Aug 26, 2020 at 4:05 PM Mark Davis wrote: > >> Hi, >> >> I am trying to investigate a problem with non-released resources in my >> application. >> >> I have a stateful application which submits Flink DataSetjobs using code >> very similar to the code in CliFrontend. >> I noticed what I am getting a lot of non-closed connections to my data store >> (HBase in my case). The connections are held by the application not the jobs >> themselves. >> >> I am using HBaseRowDataInputFormat and it seems that HBase connections >> opened in the configure() method during the job graph creation(before the >> jobs is executed) are not closed. My search lead me to the method >> DataSourceNode.computeOperatorSpecificDefaultEstimates(DataStatistics) where >> I see that a format is not closed after being configured. >> >> Is that correct? How can I overcome this issue? >> >> My application is long running that is probably why I observe the resource >> leak. Would I spawn a new JVM to run jobs this problem would not be >> noticeable. >> >> Thank you! >> >> Cheers, >> Marc
Resource leak in DataSourceNode?
Hi, I am trying to investigate a problem with non-released resources in my application. I have a stateful application which submits Flink DataSetjobs using code very similar to the code in CliFrontend. I noticed what I am getting a lot of non-closed connections to my data store (HBase in my case). The connections are held by the application not the jobs themselves. I am using HBaseRowDataInputFormat and it seems that HBase connections opened in the configure() method during the job graph creation(before the jobs is executed) are not closed. My search lead me to the method DataSourceNode.computeOperatorSpecificDefaultEstimates(DataStatistics) where I see that a format is not closed after being configured. Is that correct? How can I overcome this issue? My application is long running that is probably why I observe the resource leak. Would I spawn a new JVM to run jobs this problem would not be noticeable. Thank you! Cheers, Marc
Re: Run command after Batch is finished
Hi Chesnay, That is an interesting proposal, thank you! I was doing something similar with the OutputFormat#close() method respecting the Format's parallelism. Using FinalizeOnMaster will make things easier. But the problem is that several OutputFormats must be synchronized externally - every output must check whether other outputs finished already... Quite cumbersome. Also there is a problem with exceptions - the OutputFormats can be never open and never closed. Mark ‐‐‐ Original Message ‐‐‐ On Monday, June 8, 2020 5:50 PM, Chesnay Schepler wrote: > This goes in the right direction; have a look at > org.apache.flink.api.common.io.FinalizeOnMaster, which an OutputFormat can > implement to run something on the Master after all subtasks have been closed. > > On 08/06/2020 17:25, Andrey Zagrebin wrote: > >> Hi Mark, >> >> I do not know how you output the results in your pipeline. >> If you use DataSet#output(OutputFormat outputFormat), you could try to >> extend the format with a custom close method which should be called once the >> task of the sink batch operator is done in the task manager. >> I also cc Aljoscha, maybe, he has more ideas. >> >> Best, >> Andrey >> >> On Sun, Jun 7, 2020 at 1:35 PM Mark Davis wrote: >> >>> Hi Jeff, >>> >>> Unfortunately this is not good enough for me. >>> My clients are very volatile, they start a batch and can go away any moment >>> without waiting for it to finish. Think of an elastic web application or an >>> AWS Lambda. >>> >>> I hopped to find something what could be deployed to the cluster together >>> with the batch code. Maybe a hook to a job manager or similar. I do not >>> plan to run anything heavy there, just some formal cleanups. >>> Is there something like this? >>> >>> Thank you! >>> >>> Mark >>> >>> ‐‐‐ Original Message ‐‐‐ >>> On Saturday, June 6, 2020 4:29 PM, Jeff Zhang wrote: >>> >>>> It would run in the client side where ExecutionEnvironment is created. >>>> >>>> Mark Davis 于2020年6月6日周六 下午8:14写道: >>>> >>>>> Hi Jeff, >>>>> >>>>> Thank you very much! That is exactly what I need. >>>>> >>>>> Where the listener code will run in the cluster deployment(YARN, k8s)? >>>>> Will it be sent over the network? >>>>> >>>>> Thank you! >>>>> >>>>> Mark >>>>> >>>>> ‐‐‐ Original Message ‐‐‐ >>>>> On Friday, June 5, 2020 6:13 PM, Jeff Zhang wrote: >>>>> >>>>>> You can try JobListener which you can register to ExecutionEnvironment. >>>>>> >>>>>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java >>>>>> >>>>>> Mark Davis 于2020年6月6日周六 上午12:00写道: >>>>>> >>>>>>> Hi there, >>>>>>> >>>>>>> I am running a Batch job with several outputs. >>>>>>> Is there a way to run some code(e.g. release a distributed lock) after >>>>>>> all outputs are finished? >>>>>>> >>>>>>> Currently I do this in a try-finally block around >>>>>>> ExecutionEnvironment.execute() call, but I have to switch to the >>>>>>> detached execution mode - in this mode the finally block is never run. >>>>>>> >>>>>>> Thank you! >>>>>>> >>>>>>> Mark >>>>>> >>>>>> -- >>>>>> Best Regards >>>>>> >>>>>> Jeff Zhang >>>> >>>> -- >>>> Best Regards >>>> >>>> Jeff Zhang
Re: Run command after Batch is finished
Hi Jeff, Unfortunately this is not good enough for me. My clients are very volatile, they start a batch and can go away any moment without waiting for it to finish. Think of an elastic web application or an AWS Lambda. I hopped to find something what could be deployed to the cluster together with the batch code. Maybe a hook to a job manager or similar. I do not plan to run anything heavy there, just some formal cleanups. Is there something like this? Thank you! Mark ‐‐‐ Original Message ‐‐‐ On Saturday, June 6, 2020 4:29 PM, Jeff Zhang wrote: > It would run in the client side where ExecutionEnvironment is created. > > Mark Davis 于2020年6月6日周六 下午8:14写道: > >> Hi Jeff, >> >> Thank you very much! That is exactly what I need. >> >> Where the listener code will run in the cluster deployment(YARN, k8s)? >> Will it be sent over the network? >> >> Thank you! >> >> Mark >> >> ‐‐‐ Original Message ‐‐‐ >> On Friday, June 5, 2020 6:13 PM, Jeff Zhang wrote: >> >>> You can try JobListener which you can register to ExecutionEnvironment. >>> >>> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java >>> >>> Mark Davis 于2020年6月6日周六 上午12:00写道: >>> >>>> Hi there, >>>> >>>> I am running a Batch job with several outputs. >>>> Is there a way to run some code(e.g. release a distributed lock) after all >>>> outputs are finished? >>>> >>>> Currently I do this in a try-finally block around >>>> ExecutionEnvironment.execute() call, but I have to switch to the detached >>>> execution mode - in this mode the finally block is never run. >>>> >>>> Thank you! >>>> >>>> Mark >>> >>> -- >>> Best Regards >>> >>> Jeff Zhang > > -- > Best Regards > > Jeff Zhang
Re: Run command after Batch is finished
Hi Jeff, Thank you very much! That is exactly what I need. Where the listener code will run in the cluster deployment(YARN, k8s)? Will it be sent over the network? Thank you! Mark ‐‐‐ Original Message ‐‐‐ On Friday, June 5, 2020 6:13 PM, Jeff Zhang wrote: > You can try JobListener which you can register to ExecutionEnvironment. > > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/core/execution/JobListener.java > > Mark Davis 于2020年6月6日周六 上午12:00写道: > >> Hi there, >> >> I am running a Batch job with several outputs. >> Is there a way to run some code(e.g. release a distributed lock) after all >> outputs are finished? >> >> Currently I do this in a try-finally block around >> ExecutionEnvironment.execute() call, but I have to switch to the detached >> execution mode - in this mode the finally block is never run. >> >> Thank you! >> >> Mark > > -- > Best Regards > > Jeff Zhang
Run command after Batch is finished
Hi there, I am running a Batch job with several outputs. Is there a way to run some code(e.g. release a distributed lock) after all outputs are finished? Currently I do this in a try-finally block around ExecutionEnvironment.execute() call, but I have to switch to the detached execution mode - in this mode the finally block is never run. Thank you! Mark
Re: DataSet API: HBase ScannerTimeoutException and double Result processing
Hi Flavio, >> When the resultScanner dies because of a timeout (this happens a lot when >> you have backpressure and the time between 2 consecutive reads exceed the >> scanner timeout), the code creates a new scanner and restart from where it >> was (starRow = currentRow). >> So there should not be any duplicates (in theory), but this could be the >> root of the problem.. Yes, you are right, the nextRecord() exception handling is responsible for the duplicate record processing: org.apache.hadoop.hbase.client.ScannerTimeoutException: 1038878ms passed since the last invocation, timeout is currently set to 6 at org.apache.hadoop.hbase.client.ClientScanner.loadCache(ClientScanner.java:453) at org.apache.hadoop.hbase.client.ClientScanner.next(ClientScanner.java:371) at org.apache.flink.addons.hbase.AbstractTableInputFormat.nextRecord(AbstractTableInputFormat.java:130) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:192) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.hadoop.hbase.UnknownScannerException: org.apache.hadoop.hbase.UnknownScannerException: Name: 135281, already closed? at org.apache.hadoop.hbase.regionserver.RSRpcServices.scan(RSRpcServices.java:2389) at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:32385) at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2150) at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112) at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:187) at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:167) But I am not sure that the handling of the HBase exception thrown from ClientScanner.next() is correct. If the call to mapResultToOutType(Result) finished without an error there is no need to restart from the same row. The new scanner should start from the next row. Is that so or am I missing something? Best regards, Mark
DataSet API: HBase ScannerTimeoutException and double Result processing
Hello, I am reading Results from an HBase table and process them with Batch API. Everything works fine until I receive a ScannerTimeoutException from HBase. Maybe my transformations get stuck or a GC pause happen - hard to tell. The HBase Client restarts the scan and the processing continues. Except one problem - every time I receive this Exception I observe a duplicate Result processing - the Result which was processed just before ScannerTimeoutException is thrown is processed twice. Is this expected behavior? Should I be prepared to handle it? And how should I handle it? Keeping track of all processed Results is not feasible in my case. Here is a simple job demonstrating an issue (HBase scan and RPC timeouts are set to 60 sec) Thank you! Best regards, Mark public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.createInput(new Src()) .map(new Mapper()) .print(); } private static class Mapper implements MapFunction, String> { private int cnt = 0; @Override public String map(Tuple1 value) throws Exception { if (cnt++ % 2 == 0) { Thread.sleep(12); } return value.f0; } } private static class Src extends AbstractTableInputFormat> { @Override protected Scan getScanner() { Scan scan = new Scan(); scan.setStartRow(getStartRow()); scan.setStopRow(getEndRow()); scan.setCaching(1); scan.setCacheBlocks(false); return scan; } @Override protected String getTableName() { return getTable(); } @Override protected Tuple1 mapResultToOutType(Result r) { return new Tuple1(Bytes.toString(r.getRow())); } @Override public void configure(org.apache.flink.configuration.Configuration parameters) { scan = getScanner(); try { table = new HTable(getHadoopConf(), getTableName()); } catch (IOException e) { e.printStackTrace(); } } }