Re: Task manger isn’t initiating with defined values in Flink 1.11 version as part of EMR 6.1.0
(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.Exception: Container released on a *lost* node at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:385) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195) ... 22 more 2021-01-10 20:07:48,974 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping checkpoint coordinator for job a7cffc31c4aeb01356c5132c908be314. 2021-01-10 20:07:48,974 INFO org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - Shutting down 2021-01-10 20:07:48,978 INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job a7cffc31c4aeb01356c5132c908be314 reached globally terminal state FAILED. 2021-01-10 20:07:49,006 INFO org.apache.flink.runtime.jobmaster.JobMaster - Stopping the JobMaster for job Gas Job Runner V2(a7cffc31c4aeb01356c5132c908be314). 2021-01-10 20:07:49,006 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Suspending SlotPool. 2021-01-10 20:07:49,007 INFO org.apache.flink.runtime.jobmaster.JobMaster - Close ResourceManager connection 39a33f865ba12bac16dd21b834527750: JobManager is shutting down.. 2021-01-10 20:07:49,007 INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Stopping SlotPool. 2021-01-10 20:07:49,007 INFO org.apache.flink.yarn.YarnResourceManager - Disconnect job manager 0...@akka.tcp://flink@ip-10-6-0-231.ec2.internal:39039/user/rpc/jobmanager_2 for job a7cffc31c4aeb01356c5132c908be314 from the resource manager. Could you please help us here why it is failing now when we increased the “parallelism”? Thanks & Regards, -Deep On Mon, Jan 4, 2021 at 8:12 PM DEEP NARAYAN Singh wrote: > Thanks Till, for the detailed explanation.I tried and it is working fine. > > Once again thanks for your quick response. > > Regards, > -Deep > > On Mon, 4 Jan, 2021, 2:20 PM Till Rohrmann, wrote: > >> Hi Deep, >> >> Flink has dropped support for specifying the number of TMs via -n since >> the >> introduction of Flip-6. Since then, Flink will automatically start TMs >> depending on the required resources. Hence, there is no need to specify >> the >> -n parameter anymore. Instead, you should specify the parallelism with >> which you would like to run your job via the -p option. >> >> Since Flink 1.11.0 there is the option slotmanager.number-of-slots.max to >> limit the upper limit of slots a cluster is allowed to allocate [1]. >> >> [1] https://issues.apache.org/jira/browse/FLINK-16605 >> >> Cheers, >> Till >> >> On Mon, Jan 4, 2021 at 8:33 AM DEEP NARAYAN Singh >> wrote: >> >> > Hi Guys, >> > >> > I’m struggling while initiating the task manager with flink 1.11.0 in >> AWS >> > EMR but with older versions it is not. Let me put the full context here. >> > >> > *When using Flink 1.9.1 and EMR 5.29.0* >> > >> > To create a long running session, we used the below command. >> > >> > *sudo flink-yarn-session -n -s -jm >> >> > -tm -d* >> > >> > and followed by below command to run the final job. >> > >> > *flink run -m yarn-cluster -yid -yn -ys >> > -yjm -ytm -c * >> > >> > and if “n” is 6 then it is used to create 6 task managers to start the >> job, >> > so whatever “n” is configured the result was that number of TM the job >> is >> > being started. >> > >> > But Now when we scaled up with the configuration (*i.e. Flink 1.11.0 and >> > EMR 6.1.0*) we are unable to achieve the desired values for TM. >> > >> > Please find the session Ids of new configuration, >> > >> > *sudo flink-yarn-session -Djobmanager.memory.process.size= >> > -Dtaskmanager.memory.process.size= -n -s > > slot/core> -d* >> > >> > And the final Job command >> > >> > *flink run -m yarn-cluster -yid -c > > Path>* >> > >> > I have tried a lot of combinations, but nothing worked out so far. I >> > request your help in this regard as the plan to have this configuration >> in >> > *PRODUCTION* soon. >> > >> > Thanks in advance. >> > >> > >> > Regards, >> > >> > -Deep >> > >> >
Re: Task manger isn’t initiating with defined values in Flink 1.11 version as part of EMR 6.1.0
Thanks Till, for the detailed explanation.I tried and it is working fine. Once again thanks for your quick response. Regards, -Deep On Mon, 4 Jan, 2021, 2:20 PM Till Rohrmann, wrote: > Hi Deep, > > Flink has dropped support for specifying the number of TMs via -n since the > introduction of Flip-6. Since then, Flink will automatically start TMs > depending on the required resources. Hence, there is no need to specify the > -n parameter anymore. Instead, you should specify the parallelism with > which you would like to run your job via the -p option. > > Since Flink 1.11.0 there is the option slotmanager.number-of-slots.max to > limit the upper limit of slots a cluster is allowed to allocate [1]. > > [1] https://issues.apache.org/jira/browse/FLINK-16605 > > Cheers, > Till > > On Mon, Jan 4, 2021 at 8:33 AM DEEP NARAYAN Singh > wrote: > > > Hi Guys, > > > > I’m struggling while initiating the task manager with flink 1.11.0 in AWS > > EMR but with older versions it is not. Let me put the full context here. > > > > *When using Flink 1.9.1 and EMR 5.29.0* > > > > To create a long running session, we used the below command. > > > > *sudo flink-yarn-session -n -s -jm > > > -tm -d* > > > > and followed by below command to run the final job. > > > > *flink run -m yarn-cluster -yid -yn -ys > > -yjm -ytm -c * > > > > and if “n” is 6 then it is used to create 6 task managers to start the > job, > > so whatever “n” is configured the result was that number of TM the job is > > being started. > > > > But Now when we scaled up with the configuration (*i.e. Flink 1.11.0 and > > EMR 6.1.0*) we are unable to achieve the desired values for TM. > > > > Please find the session Ids of new configuration, > > > > *sudo flink-yarn-session -Djobmanager.memory.process.size= > > -Dtaskmanager.memory.process.size= -n -s > slot/core> -d* > > > > And the final Job command > > > > *flink run -m yarn-cluster -yid -c > Path>* > > > > I have tried a lot of combinations, but nothing worked out so far. I > > request your help in this regard as the plan to have this configuration > in > > *PRODUCTION* soon. > > > > Thanks in advance. > > > > > > Regards, > > > > -Deep > > >
Task manger isn’t initiating with defined values in Flink 1.11 version as part of EMR 6.1.0
Hi Guys, I’m struggling while initiating the task manager with flink 1.11.0 in AWS EMR but with older versions it is not. Let me put the full context here. *When using Flink 1.9.1 and EMR 5.29.0* To create a long running session, we used the below command. *sudo flink-yarn-session -n -s -jm -tm -d* and followed by below command to run the final job. *flink run -m yarn-cluster -yid -yn -ys -yjm -ytm -c * and if “n” is 6 then it is used to create 6 task managers to start the job, so whatever “n” is configured the result was that number of TM the job is being started. But Now when we scaled up with the configuration (*i.e. Flink 1.11.0 and EMR 6.1.0*) we are unable to achieve the desired values for TM. Please find the session Ids of new configuration, *sudo flink-yarn-session -Djobmanager.memory.process.size= -Dtaskmanager.memory.process.size= -n -s -d* And the final Job command *flink run -m yarn-cluster -yid -c * I have tried a lot of combinations, but nothing worked out so far. I request your help in this regard as the plan to have this configuration in *PRODUCTION* soon. Thanks in advance. Regards, -Deep
Re: Urgent help on S3 CSV file reader DataStream Job
Hi Wei and Till, Thanks for the quick reply. *@Wei,* I tried with code which you have suggested and it is working fine but I have one use case where it is failing, below is the csv input data format : Csv file data format : --- *field_id,data,* *A,1B,3C,4D,9* *E,0,0,0,0* because of last row which contains more that two value, and its is throwing *org.apache.flink.api.common.io.ParseException: Row too short: field_id,data,* How to handle the above corner case.Could you please suggest some way to handle this. *@Till,* Could you please elaborate more which you are suggesting? As per my use case I am dealing with multiple csv files under the given folder and reading line by line using TextInputFormat and transform will not work by using map operator. Correct me if i'm wrong . Thanks & Regards, -Deep On Mon, Dec 7, 2020 at 6:38 PM Till Rohrmann wrote: > Hi Deep, > > Could you use the TextInputFormat which reads a file line by line? That way > you can do the JSON parsing as part of a mapper which consumes the file > lines. > > Cheers, > Till > > On Mon, Dec 7, 2020 at 1:05 PM Wei Zhong wrote: > > > Hi Deep, > > > > (redirecting this to user mailing list as this is not a dev question) > > > > You can try to set the line delimiter and field delimiter of the > > RowCsvInputFormat to a non-printing character (assume there is no > non-printing > > characters in the csv files). It will read all the content of a csv file > > into one Row. e.g. > > > > final StreamExecutionEnvironment env = > >StreamExecutionEnvironment.getExecutionEnvironment(); > > String path = "test"; > > TypeInformation[] fieldTypes = new TypeInformation[]{ > >BasicTypeInfo.STRING_TYPE_INFO}; > > RowCsvInputFormat csvFormat = > >new RowCsvInputFormat(new Path(path), fieldTypes); > > csvFormat.setNestedFileEnumeration(true); > > csvFormat.setDelimiter((char) 0); > > csvFormat.setFieldDelimiter(String.valueOf((char) 0)); > > DataStream > >lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE, > >-1);lines.map(value -> value).print(); > > env.execute(); > > > > > > Then you can convert the content of the csv files to json manually. > > > > Best, > > Wei > > > > > > 在 2020年12月7日,19:10,DEEP NARAYAN Singh 写道: > > > > Hi Guys, > > > > Below is my code snippet , which read all csv files under the given > folder > > row by row but my requirement is to read csv file at a time and convert > as > > json which will looks like : > > {"A":"1","B":"3","C":"4","D":9} > > > > Csv file data format : > > --- > > *field_id,data,* > > > > > > > > *A,1B,3C,4D,9* > > > > Code snippet: > > -- > > > > > > > > > > > > > > > > > > > > > > > > > > > > *final StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment();String path = > > "s3://messages/data/test/dev/2020-12-07/67241306/";TypeInformation[] > > fieldTypes = new TypeInformation[]{ BasicTypeInfo.STRING_TYPE_INFO, > > BasicTypeInfo.STRING_TYPE_INFO};RowCsvInputFormat csvFormat = new > > RowCsvInputFormat(new Path(path), > > > > > fieldTypes);csvFormat.setSkipFirstLineAsHeader(true);csvFormat.setNestedFileEnumeration(true);DataStream > > lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE, > > -1);lines.map(value -> value).print();* > > > > > > Any help is highly appreciated. > > > > Thanks, > > -Deep > > > > > > >
Urgent help on S3 CSV file reader DataStream Job
Hi Guys, Below is my code snippet , which read all csv files under the given folder row by row but my requirement is to read csv file at a time and convert as json which will looks like : {"A":"1","B":"3","C":"4","D":9} Csv file data format : --- *field_id,data,* *A,1B,3C,4D,9* Code snippet: -- *final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String path = "s3://messages/data/test/dev/2020-12-07/67241306/";TypeInformation[] fieldTypes = new TypeInformation[]{ BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO};RowCsvInputFormat csvFormat = new RowCsvInputFormat(new Path(path), fieldTypes);csvFormat.setSkipFirstLineAsHeader(true);csvFormat.setNestedFileEnumeration(true);DataStream lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE, -1);lines.map(value -> value).print();* Any help is highly appreciated. Thanks, -Deep
Need input on creating s3 source DataStream to filter Json file before pass to actual reader
Hi Guys, As part of flink s3 source reader can we do filter on json file name containing date and time as mentioned below : [image: image.png] I am able to read all the json files under a specific date but my use case is to read selected hour and minutes json files based on date range as shown in the below sample directory structure for multiple device ids. *s3-success-messages/dev/location/device1/data/2020-08-09/16/2020-08-09_16:00:00_25700.jsons3-success-messages/dev/location/device1/data/2020-08-09/16/2020-08-09_16:05:00_25701.jsons3-success-messages/dev/location/device1/data/2020-08-09/16/2020-08-09_16:10:00_25702.jsons3-success-messages/dev/location/device1/data/2020-08-09/16/2020-08-09_16:15:00_25703.json* How do we filter specific files for specific dates under specific hours and minutes wise before passing them to the source reader . Below is my current code snippet for creating Input stream: public static DataStream createInputStream(StreamExecutionEnvironment environment, String directory, Integer readerParallelism) { TextInputFormat format = new TextInputFormat(new org.apache.flink.core.fs.Path(" s3-success-messages/dev/location/device1/data/2020-08-09 ")); format.setNestedFileEnumeration(true); format.setMinSplitSize(10); return environment .readFile(format, directory, FileProcessingMode.PROCESS_ONCE, -1, FilePathFilter.createDefaultFilter()) .name(directory).setParallelism(readerParallelism); } Do we have any API's available as part of flink s3 source to filter the json before push to the next operator to process based on selected date range. Thanks, -Deep
Re: Resource Optimization for Flink Job in AWS EMR Cluster
Thanks Prasanna & Till for quick response. Looks like my use case is very similar to yours ,I will try to run multiple containers on the same machine and will update you accordingly. Thanks , -Deep On Thu, Nov 5, 2020 at 2:33 PM Till Rohrmann wrote: > Hi Deep, > > you can increase the average CPU load by reducing the number of overall > resources. Having fewer slots over which you can distribute the work should > increase the resource usage. > > Cheers, > Till > > On Thu, Nov 5, 2020 at 9:03 AM Prasanna kumar < > prasannakumarram...@gmail.com> > wrote: > > > Deep, > > > > 1) Is it a cpu/memory/io intensive job ?? > > > > Based on that you could allocate resources. > > > > From the question, if the CPU is not utilised , you could run multiple > > containers on the same machine(tm) ... > > > > Following may not be exact case as yours but to give you an idea. > > > > Few months back I have run jobs in emr processing 4-8k per second from > > kafka with paralleism of 8 doing lightweight transformation where end to > > end latency was less than a second (10-50ms). > > > > I used slots where memory allocated is 4GB and JM memory 1gb. Here > > multilple containers ran on the same machine and I got cpu usgae upto > 50%. > > Earlier it was in single digits when just single container ran on a > > machine. > > > > Prasanna. > > > > > > On Thu 5 Nov, 2020, 12:40 Satyaa Dixit, wrote: > > > > > Hi Deep, > > > > > > Thanks for bringing this on table, I'm also facing a similar kind of > > issue > > > while deploying my flink Job w.r.t resources optimization. > > > > > > Hi Team, > > > > > > It would be much appreciated if someone helps us here. > > > > > > > > > Regards, > > > Satya > > > > > > On Wed, Nov 4, 2020 at 6:33 PM DEEP NARAYAN Singh < > about.d...@gmail.com> > > > wrote: > > > > > > > Hi All, > > > > > > > > I am running a flink streaming job in EMR Cluster with parallelism 21 > > > > having 500 records per second.But still seeing cpu utilization is > > > > approximate 5-8 percent. > > > > > > > > Below is the long running session command in EMR Cluster having 3 > > > instance > > > > of type C52xlarge(8vcore, 16 GB memory, AWS resource) > > > > > > > > *sudo flink-yarn-session -n 3 -s 7 -jm 4168 -tm 8000 -d* > > > > > > > > Anyone can suggest some configuration to maximize the CPU > utilization? > > > > And Also what would be the standard utilization of CPU for flink job > in > > > > order to achieve the minimum latency? > > > > > > > > Any leads would be appreciated. > > > > > > > > Thanks, > > > > -Deep > > > > > > > > > > > > > -- > > > -- > > > Best Regards > > > Satya Prakash > > > (M)+91-9845111913 > > > > > >
Re: Resource Optimization for Flink Job in AWS EMR Cluster
Hi Guys, Sorry to bother you again.Someone could help me here for clarifying my doubt. Any help will be highly appreciated. Thanks, -Deep On Wed, Nov 4, 2020 at 6:26 PM DEEP NARAYAN Singh wrote: > Hi All, > > I am running a flink streaming job in EMR Cluster with parallelism 21 > having 500 records per second.But still seeing cpu utilization is > approximate 5-8 percent. > > Below is the long running session command in EMR Cluster having 3 instance > of type C52xlarge(8vcore, 16 GB memory, AWS resource) > > *sudo flink-yarn-session -n 3 -s 7 -jm 4168 -tm 8000 -d* > > Anyone can suggest some configuration to maximize the CPU utilization? > And Also what would be the standard utilization of CPU for flink job in > order to achieve the minimum latency? > > Any leads would be appreciated. > > Thanks, > -Deep >
Resource Optimization for Flink Job in AWS EMR Cluster
Hi All, I am running a flink streaming job in EMR Cluster with parallelism 21 having 500 records per second.But still seeing cpu utilization is approximate 5-8 percent. Below is the long running session command in EMR Cluster having 3 instance of type C52xlarge(8vcore, 16 GB memory, AWS resource) *sudo flink-yarn-session -n 3 -s 7 -jm 4168 -tm 8000 -d* Anyone can suggest some configuration to maximize the CPU utilization? And Also what would be the standard utilization of CPU for flink job in order to achieve the minimum latency? Any leads would be appreciated. Thanks, -Deep
Re: Is there any way to change operator level parallelism dynamically?
Hi Till, Got it.thanks for the prompt reply. -Deep On Thu, 29 Oct, 2020, 1:22 PM Till Rohrmann, wrote: > Hi Deep, > > at the moment the only way to change the parallelism is to take a > savepoint, stop the job and then resume the job with a changed parallelism > from the savepoint. The community works on a more automated way for this > problem. However, in the first versions it will use the same approach with > taking a savepoint which means that there will be a slight down time. > > Cheers, > Till > > On Wed, Oct 28, 2020 at 7:05 PM DEEP NARAYAN Singh > wrote: > > > Hi Team, > > > > I just want quick help here. How to achieve the dynamic nature of > > operator level parallelism for the flink job running in AWS EMR cluster > > during runtime to avoid downtime and backpressure based on the incoming > > load. As I am very new to flink and currently using flink 1.9 version.Is > > there any way to solve this problem during runtime?. > > > > Any lead will be highly appreciated. > > > > Thanks & Regards, > > -Deep > > >
Is there any way to change operator level parallelism dynamically?
Hi Team, I just want quick help here. How to achieve the dynamic nature of operator level parallelism for the flink job running in AWS EMR cluster during runtime to avoid downtime and backpressure based on the incoming load. As I am very new to flink and currently using flink 1.9 version.Is there any way to solve this problem during runtime?. Any lead will be highly appreciated. Thanks & Regards, -Deep