Can you try to set config option taskmanager.numberOfTaskSlots to 2? By
default the TMs only offer one slot [1] independent from the number of CPU
cores.

Best,
Gary

[1]
https://github.com/apache/flink/blob/da3082764117841d885f41c645961f8993a331a0/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java#L197-L199

On Mon, Mar 30, 2020 at 1:22 PM Antonio Martínez Carratalá <
amarti...@alto-analytics.com> wrote:

> Hello
>
> I'm running Flink over Amazon EMR and I'm trying to send several different
> batch jobs to the cluster after creating it.
>
> This is my cluster creation code:
> ----------------------------------------------------------------
> StepConfig copyJarStep = new StepConfig()
>     .withName("copy-jar-step")
>     .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
>     .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
>         .withArgs("bash", "-c", "aws s3 cp s3://" + bucketName +
> "/lib/flink-jobs.jar /home/hadoop/flink-jobs.jar"));
>
> List<StepConfig> stepConfigs = new ArrayList<>();
> stepConfigs.add(copyJarStep);
>
> Application flink = new Application().withName("Flink");
>
> Configuration flinkConfiguration = new Configuration()
>      .withClassification("flink-conf")
>     .addPropertiesEntry("jobmanager.heap.size", "2048m")
>     .addPropertiesEntry("taskmanager.heap.size",  "2048m")
>
> RunJobFlowRequest request = new RunJobFlowRequest()
>     .withName("cluster-" + executionKey)
>     .withReleaseLabel("emr-5.26.0")
>     .withApplications(flink)
>     .withConfigurations(flinkConfiguration)
>     .withServiceRole("EMR_DefaultRole")
>     .withJobFlowRole("EMR_EC2_DefaultRole")
>     .withLogUri(getWorkPath() + "logs")
>     .withInstances(new JobFlowInstancesConfig()
>         .withEc2SubnetId("subnetid")
>         .withInstanceCount(2) // 1 for task manager + 1 for job manager
>         .withKeepJobFlowAliveWhenNoSteps(true)
>         .withMasterInstanceType("m4.large")
>         .withSlaveInstanceType("m4.large"))
>     .withSteps(stepConfigs);
>
> RunJobFlowResult result = amazonClient.getEmrClient().runJobFlow(request);
>
> ----------------------------------------------------------------------------------------------------------------
>
> And this is how I add the jobs:
>
> ---------------------------------------------------------------------------------
> StepConfig runJobStep = new StepConfig()
>     .withName("run-job-step")
>     .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER)
>     .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar")
>     .withArgs("bash", "-c", "flink run -m yarn-cluster"
>         + " --parallelism " + parallelism
>         + " --class " + jobClass.getCanonicalName()
>         + " /home/hadoop/flink-jobs.jar "
>         + jobArguments));
>
> AddJobFlowStepsRequest request = new AddJobFlowStepsRequest()
>     .withJobFlowId(clusterId)
>     .withSteps(runJobStep);
>
> AddJobFlowStepsResult result =
> amazonClient.getEmrClient().addJobFlowSteps(request);
>
> ---------------------------------------------------------------------------------
>
> And these are my jobs:
>
> - Job1 - parallelism 1
> - Job2 - parallelism 1
> - Job3 - parallelism 2
>
> I'm using m4.large machines as slave so I have 2 cores in it, and I was
> expecting that Job1 and Job2 were running in parallel and then Job3 when
> Job1 and Job2 finish, but what I see is that Job2 is waiting (Pending
> status) for Job1 to finish before start. I see only one task manager is
> created for Job1, when finishes another one is created for Job2, and then 2
> are created for Job3
>
> Since I have 2 cores available why is it not running Job2 in the other
> instead of wait? is there any way to configure it?
>
> Thanks
>
>
>
>

Reply via email to