Can you try to set config option taskmanager.numberOfTaskSlots to 2? By default the TMs only offer one slot [1] independent from the number of CPU cores.
Best, Gary [1] https://github.com/apache/flink/blob/da3082764117841d885f41c645961f8993a331a0/flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java#L197-L199 On Mon, Mar 30, 2020 at 1:22 PM Antonio Martínez Carratalá < amarti...@alto-analytics.com> wrote: > Hello > > I'm running Flink over Amazon EMR and I'm trying to send several different > batch jobs to the cluster after creating it. > > This is my cluster creation code: > ---------------------------------------------------------------- > StepConfig copyJarStep = new StepConfig() > .withName("copy-jar-step") > .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER) > .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar") > .withArgs("bash", "-c", "aws s3 cp s3://" + bucketName + > "/lib/flink-jobs.jar /home/hadoop/flink-jobs.jar")); > > List<StepConfig> stepConfigs = new ArrayList<>(); > stepConfigs.add(copyJarStep); > > Application flink = new Application().withName("Flink"); > > Configuration flinkConfiguration = new Configuration() > .withClassification("flink-conf") > .addPropertiesEntry("jobmanager.heap.size", "2048m") > .addPropertiesEntry("taskmanager.heap.size", "2048m") > > RunJobFlowRequest request = new RunJobFlowRequest() > .withName("cluster-" + executionKey) > .withReleaseLabel("emr-5.26.0") > .withApplications(flink) > .withConfigurations(flinkConfiguration) > .withServiceRole("EMR_DefaultRole") > .withJobFlowRole("EMR_EC2_DefaultRole") > .withLogUri(getWorkPath() + "logs") > .withInstances(new JobFlowInstancesConfig() > .withEc2SubnetId("subnetid") > .withInstanceCount(2) // 1 for task manager + 1 for job manager > .withKeepJobFlowAliveWhenNoSteps(true) > .withMasterInstanceType("m4.large") > .withSlaveInstanceType("m4.large")) > .withSteps(stepConfigs); > > RunJobFlowResult result = amazonClient.getEmrClient().runJobFlow(request); > > ---------------------------------------------------------------------------------------------------------------- > > And this is how I add the jobs: > > --------------------------------------------------------------------------------- > StepConfig runJobStep = new StepConfig() > .withName("run-job-step") > .withActionOnFailure(ActionOnFailure.TERMINATE_CLUSTER) > .withHadoopJarStep(new HadoopJarStepConfig("command-runner.jar") > .withArgs("bash", "-c", "flink run -m yarn-cluster" > + " --parallelism " + parallelism > + " --class " + jobClass.getCanonicalName() > + " /home/hadoop/flink-jobs.jar " > + jobArguments)); > > AddJobFlowStepsRequest request = new AddJobFlowStepsRequest() > .withJobFlowId(clusterId) > .withSteps(runJobStep); > > AddJobFlowStepsResult result = > amazonClient.getEmrClient().addJobFlowSteps(request); > > --------------------------------------------------------------------------------- > > And these are my jobs: > > - Job1 - parallelism 1 > - Job2 - parallelism 1 > - Job3 - parallelism 2 > > I'm using m4.large machines as slave so I have 2 cores in it, and I was > expecting that Job1 and Job2 were running in parallel and then Job3 when > Job1 and Job2 finish, but what I see is that Job2 is waiting (Pending > status) for Job1 to finish before start. I see only one task manager is > created for Job1, when finishes another one is created for Job2, and then 2 > are created for Job3 > > Since I have 2 cores available why is it not running Job2 in the other > instead of wait? is there any way to configure it? > > Thanks > > > >