Re: Task manger isn’t initiating with defined values in Flink 1.11 version as part of EMR 6.1.0

2021-01-11 Thread DEEP NARAYAN Singh
(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Container released on a *lost* node
at 
org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:385)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
... 22 more
2021-01-10 20:07:48,974 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
Stopping checkpoint coordinator for job
a7cffc31c4aeb01356c5132c908be314.
2021-01-10 20:07:48,974 INFO
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore
 - Shutting down
2021-01-10 20:07:48,978 INFO
org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job
a7cffc31c4aeb01356c5132c908be314 reached globally terminal state
FAILED.
2021-01-10 20:07:49,006 INFO
org.apache.flink.runtime.jobmaster.JobMaster  -
Stopping the JobMaster for job Gas Job Runner
V2(a7cffc31c4aeb01356c5132c908be314).
2021-01-10 20:07:49,006 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
Suspending SlotPool.
2021-01-10 20:07:49,007 INFO
org.apache.flink.runtime.jobmaster.JobMaster  - Close
ResourceManager connection 39a33f865ba12bac16dd21b834527750:
JobManager is shutting down..
2021-01-10 20:07:49,007 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
Stopping SlotPool.
2021-01-10 20:07:49,007 INFO
org.apache.flink.yarn.YarnResourceManager -
Disconnect job manager
0...@akka.tcp://flink@ip-10-6-0-231.ec2.internal:39039/user/rpc/jobmanager_2
for job a7cffc31c4aeb01356c5132c908be314 from the resource manager.


  Could you please help us here why it is failing now when we increased the
“parallelism”?


Thanks & Regards,

-Deep




On Mon, Jan 4, 2021 at 8:12 PM DEEP NARAYAN Singh 
wrote:

> Thanks Till, for the detailed explanation.I  tried and it is working fine.
>
> Once again thanks for your quick response.
>
> Regards,
> -Deep
>
> On Mon, 4 Jan, 2021, 2:20 PM Till Rohrmann,  wrote:
>
>> Hi Deep,
>>
>> Flink has dropped support for specifying the number of TMs via -n since
>> the
>> introduction of Flip-6. Since then, Flink will automatically start TMs
>> depending on the required resources. Hence, there is no need to specify
>> the
>> -n parameter anymore. Instead, you should specify the parallelism with
>> which you would like to run your job via the -p option.
>>
>> Since Flink 1.11.0 there is the option slotmanager.number-of-slots.max to
>> limit the upper limit of slots a cluster is allowed to allocate [1].
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-16605
>>
>> Cheers,
>> Till
>>
>> On Mon, Jan 4, 2021 at 8:33 AM DEEP NARAYAN Singh 
>> wrote:
>>
>> > Hi Guys,
>> >
>> > I’m struggling while initiating the task manager with flink 1.11.0 in
>> AWS
>> > EMR but with older versions it is not. Let me put the full context here.
>> >
>> > *When using Flink 1.9.1 and EMR 5.29.0*
>> >
>> > To create a long running session, we used the below command.
>> >
>> > *sudo flink-yarn-session -n  -s  -jm
>> 
>> > -tm  -d*
>> >
>> > and followed by below command to run the final job.
>> >
>> > *flink run -m yarn-cluster -yid  -yn  -ys
>> >  -yjm  -ytm  -c  *
>> >
>> > and if “n” is 6 then it is used to create 6 task managers to start the
>> job,
>> > so whatever “n” is configured the result was that number of TM the job
>> is
>> > being started.
>> >
>> > But Now when we scaled up with the configuration (*i.e. Flink 1.11.0 and
>> > EMR 6.1.0*) we are unable to achieve the desired values for TM.
>> >
>> > Please find the session Ids of new configuration,
>> >
>> > *sudo flink-yarn-session -Djobmanager.memory.process.size=
>> > -Dtaskmanager.memory.process.size= -n  -s > > slot/core> -d*
>> >
>> > And the final Job command
>> >
>> > *flink run -m yarn-cluster -yid  -c  > > Path>*
>> >
>> > I have tried a lot of combinations, but nothing worked out so far. I
>> > request your help in this regard as the plan to have this configuration
>> in
>> > *PRODUCTION* soon.
>> >
>> > Thanks in advance.
>> >
>> >
>> > Regards,
>> >
>> > -Deep
>> >
>>
>


Re: Task manger isn’t initiating with defined values in Flink 1.11 version as part of EMR 6.1.0

2021-01-04 Thread DEEP NARAYAN Singh
Thanks Till, for the detailed explanation.I  tried and it is working fine.

Once again thanks for your quick response.

Regards,
-Deep

On Mon, 4 Jan, 2021, 2:20 PM Till Rohrmann,  wrote:

> Hi Deep,
>
> Flink has dropped support for specifying the number of TMs via -n since the
> introduction of Flip-6. Since then, Flink will automatically start TMs
> depending on the required resources. Hence, there is no need to specify the
> -n parameter anymore. Instead, you should specify the parallelism with
> which you would like to run your job via the -p option.
>
> Since Flink 1.11.0 there is the option slotmanager.number-of-slots.max to
> limit the upper limit of slots a cluster is allowed to allocate [1].
>
> [1] https://issues.apache.org/jira/browse/FLINK-16605
>
> Cheers,
> Till
>
> On Mon, Jan 4, 2021 at 8:33 AM DEEP NARAYAN Singh 
> wrote:
>
> > Hi Guys,
> >
> > I’m struggling while initiating the task manager with flink 1.11.0 in AWS
> > EMR but with older versions it is not. Let me put the full context here.
> >
> > *When using Flink 1.9.1 and EMR 5.29.0*
> >
> > To create a long running session, we used the below command.
> >
> > *sudo flink-yarn-session -n  -s  -jm
> 
> > -tm  -d*
> >
> > and followed by below command to run the final job.
> >
> > *flink run -m yarn-cluster -yid  -yn  -ys
> >  -yjm  -ytm  -c  *
> >
> > and if “n” is 6 then it is used to create 6 task managers to start the
> job,
> > so whatever “n” is configured the result was that number of TM the job is
> > being started.
> >
> > But Now when we scaled up with the configuration (*i.e. Flink 1.11.0 and
> > EMR 6.1.0*) we are unable to achieve the desired values for TM.
> >
> > Please find the session Ids of new configuration,
> >
> > *sudo flink-yarn-session -Djobmanager.memory.process.size=
> > -Dtaskmanager.memory.process.size= -n  -s  > slot/core> -d*
> >
> > And the final Job command
> >
> > *flink run -m yarn-cluster -yid  -c   > Path>*
> >
> > I have tried a lot of combinations, but nothing worked out so far. I
> > request your help in this regard as the plan to have this configuration
> in
> > *PRODUCTION* soon.
> >
> > Thanks in advance.
> >
> >
> > Regards,
> >
> > -Deep
> >
>


Task manger isn’t initiating with defined values in Flink 1.11 version as part of EMR 6.1.0

2021-01-03 Thread DEEP NARAYAN Singh
Hi Guys,

I’m struggling while initiating the task manager with flink 1.11.0 in AWS
EMR but with older versions it is not. Let me put the full context here.

*When using Flink 1.9.1 and EMR 5.29.0*

To create a long running session, we used the below command.

*sudo flink-yarn-session -n  -s  -jm 
-tm  -d*

and followed by below command to run the final job.

*flink run -m yarn-cluster -yid  -yn  -ys
 -yjm  -ytm  -c  *

and if “n” is 6 then it is used to create 6 task managers to start the job,
so whatever “n” is configured the result was that number of TM the job is
being started.

But Now when we scaled up with the configuration (*i.e. Flink 1.11.0 and
EMR 6.1.0*) we are unable to achieve the desired values for TM.

Please find the session Ids of new configuration,

*sudo flink-yarn-session -Djobmanager.memory.process.size=
-Dtaskmanager.memory.process.size= -n  -s  -d*

And the final Job command

*flink run -m yarn-cluster -yid  -c  *

I have tried a lot of combinations, but nothing worked out so far. I
request your help in this regard as the plan to have this configuration in
*PRODUCTION* soon.

Thanks in advance.


Regards,

-Deep


Re: Urgent help on S3 CSV file reader DataStream Job

2020-12-07 Thread DEEP NARAYAN Singh
Hi Wei and Till,
Thanks for the quick reply.

*@Wei,* I tried with code which you have suggested and it is working fine
but I have one use case where it is failing, below is the csv input data
format :
Csv file data format   :
---
*field_id,data,*



*A,1B,3C,4D,9*
*E,0,0,0,0*

because of last row which contains more that two value, and its is
throwing *org.apache.flink.api.common.io.ParseException:
Row too short: field_id,data,*

How to handle the above corner case.Could you please suggest some way to
handle this.

*@Till,* Could you please elaborate more which you are suggesting? As per
my use case I am dealing with multiple csv files under the given folder and
reading line by line using TextInputFormat  and transform will not work by
using map operator. Correct me if i'm wrong .

Thanks & Regards,
-Deep


On Mon, Dec 7, 2020 at 6:38 PM Till Rohrmann  wrote:

> Hi Deep,
>
> Could you use the TextInputFormat which reads a file line by line? That way
> you can do the JSON parsing as part of a mapper which consumes the file
> lines.
>
> Cheers,
> Till
>
> On Mon, Dec 7, 2020 at 1:05 PM Wei Zhong  wrote:
>
> > Hi Deep,
> >
> > (redirecting this to user mailing list as this is not a dev question)
> >
> > You can try to set the line delimiter and field delimiter of the
> > RowCsvInputFormat to a non-printing character (assume there is no
> non-printing
> > characters in the csv files). It will read all the content of a csv file
> > into one Row. e.g.
> >
> > final StreamExecutionEnvironment env =
> >StreamExecutionEnvironment.getExecutionEnvironment();
> > String path = "test";
> > TypeInformation[] fieldTypes = new TypeInformation[]{
> >BasicTypeInfo.STRING_TYPE_INFO};
> > RowCsvInputFormat csvFormat =
> >new RowCsvInputFormat(new Path(path), fieldTypes);
> > csvFormat.setNestedFileEnumeration(true);
> > csvFormat.setDelimiter((char) 0);
> > csvFormat.setFieldDelimiter(String.valueOf((char) 0));
> > DataStream
> >lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
> >-1);lines.map(value -> value).print();
> > env.execute();
> >
> >
> > Then you can convert the content of the csv files to json manually.
> >
> > Best,
> > Wei
> >
> >
> > 在 2020年12月7日,19:10,DEEP NARAYAN Singh  写道:
> >
> > Hi  Guys,
> >
> > Below is my code snippet , which read all csv files under the given
> folder
> > row by row but my requirement is to read csv file at a time and  convert
> as
> > json which will looks like :
> > {"A":"1","B":"3","C":"4","D":9}
> >
> > Csv file data format   :
> > ---
> > *field_id,data,*
> >
> >
> >
> > *A,1B,3C,4D,9*
> >
> > Code snippet:
> > --
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > *final StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();String path =
> > "s3://messages/data/test/dev/2020-12-07/67241306/";TypeInformation[]
> > fieldTypes = new TypeInformation[]{  BasicTypeInfo.STRING_TYPE_INFO,
> >  BasicTypeInfo.STRING_TYPE_INFO};RowCsvInputFormat csvFormat =  new
> > RowCsvInputFormat(new Path(path),
> >
> >
> fieldTypes);csvFormat.setSkipFirstLineAsHeader(true);csvFormat.setNestedFileEnumeration(true);DataStream
> > lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
> > -1);lines.map(value -> value).print();*
> >
> >
> > Any help is highly appreciated.
> >
> > Thanks,
> > -Deep
> >
> >
> >
>


Urgent help on S3 CSV file reader DataStream Job

2020-12-07 Thread DEEP NARAYAN Singh
Hi  Guys,

Below is my code snippet , which read all csv files under the given folder
row by row but my requirement is to read csv file at a time and  convert as
json which will looks like :
{"A":"1","B":"3","C":"4","D":9}

Csv file data format   :
---
*field_id,data,*



*A,1B,3C,4D,9*

Code snippet:
--













*final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();String path =
"s3://messages/data/test/dev/2020-12-07/67241306/";TypeInformation[]
fieldTypes = new TypeInformation[]{  BasicTypeInfo.STRING_TYPE_INFO,
  BasicTypeInfo.STRING_TYPE_INFO};RowCsvInputFormat csvFormat =  new
RowCsvInputFormat(new Path(path),
fieldTypes);csvFormat.setSkipFirstLineAsHeader(true);csvFormat.setNestedFileEnumeration(true);DataStream
lines = env.readFile(csvFormat, path, FileProcessingMode.PROCESS_ONCE,
-1);lines.map(value -> value).print();*


Any help is highly appreciated.

Thanks,
-Deep


Need input on creating s3 source DataStream to filter Json file before pass to actual reader

2020-11-23 Thread DEEP NARAYAN Singh
Hi Guys,

As part of flink s3 source reader can we do filter on json file name
containing date and time as mentioned below :
[image: image.png]
I am able to read all the json files under a specific date but my use case
is to read  selected hour and minutes json files based on date range as
shown in the below sample directory structure for multiple device ids.




*s3-success-messages/dev/location/device1/data/2020-08-09/16/2020-08-09_16:00:00_25700.jsons3-success-messages/dev/location/device1/data/2020-08-09/16/2020-08-09_16:05:00_25701.jsons3-success-messages/dev/location/device1/data/2020-08-09/16/2020-08-09_16:10:00_25702.jsons3-success-messages/dev/location/device1/data/2020-08-09/16/2020-08-09_16:15:00_25703.json*

How do we filter specific files for specific dates under specific hours and
minutes wise before passing them to the source reader .

Below is my current code snippet for creating Input stream:

public static DataStream
createInputStream(StreamExecutionEnvironment environment, String directory,
Integer readerParallelism) {

TextInputFormat format = new TextInputFormat(new
org.apache.flink.core.fs.Path("
s3-success-messages/dev/location/device1/data/2020-08-09 "));
format.setNestedFileEnumeration(true);
format.setMinSplitSize(10);
return environment
.readFile(format, directory, FileProcessingMode.PROCESS_ONCE, -1,
FilePathFilter.createDefaultFilter())
.name(directory).setParallelism(readerParallelism);
}

Do we have any API's available as part of flink s3 source to filter the
json before push to the next operator to process based on selected date
range.

Thanks,
-Deep


Re: Resource Optimization for Flink Job in AWS EMR Cluster

2020-11-05 Thread DEEP NARAYAN Singh
Thanks  Prasanna & Till for quick response.

Looks like my use case is very similar to yours ,I will try to run multiple
containers on the same machine  and will update you accordingly.

Thanks ,
-Deep


On Thu, Nov 5, 2020 at 2:33 PM Till Rohrmann  wrote:

> Hi Deep,
>
> you can increase the average CPU load by reducing the number of overall
> resources. Having fewer slots over which you can distribute the work should
> increase the resource usage.
>
> Cheers,
> Till
>
> On Thu, Nov 5, 2020 at 9:03 AM Prasanna kumar <
> prasannakumarram...@gmail.com>
> wrote:
>
> > Deep,
> >
> > 1) Is it a cpu/memory/io intensive job ??
> >
> > Based on that you could allocate resources.
> >
> > From the question, if the CPU is not utilised , you could run multiple
> > containers on the same machine(tm) ...
> >
> > Following may not be exact case as yours but to give you an idea.
> >
> > Few months back I have run jobs in emr processing 4-8k per second from
> > kafka with paralleism of 8 doing lightweight transformation where end to
> > end latency was less than a second (10-50ms).
> >
> > I used slots where memory allocated is 4GB and JM memory 1gb. Here
> > multilple containers ran on the same machine and I got cpu usgae upto
> 50%.
> > Earlier it was in single digits when just single container ran on a
> > machine.
> >
> > Prasanna.
> >
> >
> > On Thu 5 Nov, 2020, 12:40 Satyaa Dixit,  wrote:
> >
> > > Hi Deep,
> > >
> > > Thanks for bringing this on table, I'm also facing a similar kind of
> > issue
> > > while deploying my flink Job w.r.t  resources  optimization.
> > >
> > > Hi Team,
> > >
> > > It would be much appreciated if someone helps us here.
> > >
> > >
> > > Regards,
> > > Satya
> > >
> > > On Wed, Nov 4, 2020 at 6:33 PM DEEP NARAYAN Singh <
> about.d...@gmail.com>
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > I am running a flink streaming job in EMR Cluster with parallelism 21
> > > > having 500 records per second.But still seeing cpu utilization is
> > > > approximate 5-8 percent.
> > > >
> > > > Below is the long running session command in EMR Cluster having 3
> > > instance
> > > > of type C52xlarge(8vcore, 16 GB memory, AWS resource)
> > > >
> > > > *sudo flink-yarn-session -n 3 -s 7 -jm 4168 -tm 8000 -d*
> > > >
> > > > Anyone can suggest some configuration to maximize the CPU
> utilization?
> > > > And Also what would be the standard utilization of CPU for flink job
> in
> > > > order to achieve the minimum latency?
> > > >
> > > > Any leads would be appreciated.
> > > >
> > > > Thanks,
> > > > -Deep
> > > >
> > >
> > >
> > > --
> > > --
> > > Best Regards
> > > Satya Prakash
> > > (M)+91-9845111913
> > >
> >
>


Re: Resource Optimization for Flink Job in AWS EMR Cluster

2020-11-04 Thread DEEP NARAYAN Singh
Hi Guys,
Sorry to bother you again.Someone could help me here for clarifying my
doubt. Any help will be highly appreciated.

Thanks,
-Deep

On Wed, Nov 4, 2020 at 6:26 PM DEEP NARAYAN Singh 
wrote:

> Hi All,
>
> I am running a flink streaming job in EMR Cluster with parallelism 21
> having 500 records per second.But still seeing cpu utilization is
> approximate 5-8 percent.
>
> Below is the long running session command in EMR Cluster having 3 instance
> of type C52xlarge(8vcore, 16 GB memory, AWS resource)
>
> *sudo flink-yarn-session -n 3 -s 7 -jm 4168 -tm 8000 -d*
>
> Anyone can suggest some configuration to maximize the CPU utilization?
> And Also what would be the standard utilization of CPU for flink job in
> order to achieve the minimum latency?
>
> Any leads would be appreciated.
>
> Thanks,
> -Deep
>


Resource Optimization for Flink Job in AWS EMR Cluster

2020-11-04 Thread DEEP NARAYAN Singh
Hi All,

I am running a flink streaming job in EMR Cluster with parallelism 21
having 500 records per second.But still seeing cpu utilization is
approximate 5-8 percent.

Below is the long running session command in EMR Cluster having 3 instance
of type C52xlarge(8vcore, 16 GB memory, AWS resource)

*sudo flink-yarn-session -n 3 -s 7 -jm 4168 -tm 8000 -d*

Anyone can suggest some configuration to maximize the CPU utilization?
And Also what would be the standard utilization of CPU for flink job in
order to achieve the minimum latency?

Any leads would be appreciated.

Thanks,
-Deep


Re: Is there any way to change operator level parallelism dynamically?

2020-10-29 Thread DEEP NARAYAN Singh
Hi Till,

Got it.thanks for the prompt reply.

-Deep

On Thu, 29 Oct, 2020, 1:22 PM Till Rohrmann,  wrote:

> Hi Deep,
>
> at the moment the only way to change the parallelism is to take a
> savepoint, stop the job and then resume the job with a changed parallelism
> from the savepoint. The community works on a more automated way for this
> problem. However, in the first versions it will use the same approach with
> taking a savepoint which means that there will be a slight down time.
>
> Cheers,
> Till
>
> On Wed, Oct 28, 2020 at 7:05 PM DEEP NARAYAN Singh 
> wrote:
>
> > Hi Team,
> >
> > I just want quick help here. How to achieve the dynamic nature of
> > operator level parallelism for the flink job running in AWS EMR cluster
> > during runtime to avoid downtime and backpressure based on the incoming
> > load. As I am very new to flink and currently using flink 1.9 version.Is
> > there any way to solve this problem during runtime?.
> >
> > Any lead will be highly appreciated.
> >
> > Thanks & Regards,
> > -Deep
> >
>


Is there any way to change operator level parallelism dynamically?

2020-10-28 Thread DEEP NARAYAN Singh
Hi Team,

I just want quick help here. How to achieve the dynamic nature of
operator level parallelism for the flink job running in AWS EMR cluster
during runtime to avoid downtime and backpressure based on the incoming
load. As I am very new to flink and currently using flink 1.9 version.Is
there any way to solve this problem during runtime?.

Any lead will be highly appreciated.

Thanks & Regards,
-Deep