valepakh commented on code in PR #2894:
URL: https://github.com/apache/ignite-3/pull/2894#discussion_r1409339779
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/BoundedPriorityBlockingQueue.java:
##########
@@ -273,4 +273,5 @@ private void checkInsert(int size) {
+ "Max queue size is " + maxSize + ".");
}
}
+
Review Comment:
```suggestion
```
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/state/ComputeStateMachine.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.state;
+
+import java.util.UUID;
+import org.apache.ignite.compute.JobState;
+
+/**
+ * State machine of Compute Jobs.
+ */
+public interface ComputeStateMachine {
+ /**
+ * Initialize Compute job in state machine. This job should have status
{@link JobState.SUBMITTED}.
Review Comment:
```suggestion
* Initialize Compute job in state machine. This job should have status
{@link JobState.QUEUED}.
```
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/state/IllegalJobStateTransition.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.state;
+
+import static
org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_STATE_TRANSITION_ERR;
+
+import java.util.UUID;
+import org.apache.ignite.compute.JobState;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+
+/**
+ * Throws from Compute Jobs state machine {@link ComputeStateMachine} when job
state transfer is illegal.
+ */
+public class IllegalJobStateTransition extends IgniteInternalException {
+ public IllegalJobStateTransition(UUID jobId) {
+ super(COMPUTE_JOB_STATE_TRANSITION_ERR, "Failed to transfer job state
for not-existed job.");
Review Comment:
```suggestion
super(COMPUTE_JOB_STATE_TRANSITION_ERR, "Failed to transfer job
state for nonexistent job " + jobId + ".");
```
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachine.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.state;
+
+import static org.apache.ignite.compute.JobState.CANCELED;
+import static org.apache.ignite.compute.JobState.CANCELING;
+import static org.apache.ignite.compute.JobState.COMPLETED;
+import static org.apache.ignite.compute.JobState.EXECUTING;
+import static org.apache.ignite.compute.JobState.FAILED;
+import static org.apache.ignite.compute.JobState.QUEUED;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import org.apache.ignite.compute.JobState;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+
+/**
+ * In memory implementation of {@link ComputeStateMachine}.
+ */
+public class InMemoryComputeStateMachine implements ComputeStateMachine {
+ private static final IgniteLogger LOG =
Loggers.forClass(InMemoryComputeStateMachine.class);
+
+ private final Map<UUID, JobState> states = new ConcurrentHashMap<>();
Review Comment:
We should think about cleaning up this map after the job is completed.
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/state/ComputeStateMachine.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.state;
+
+import java.util.UUID;
+import org.apache.ignite.compute.JobState;
+
+/**
+ * State machine of Compute Jobs.
+ */
+public interface ComputeStateMachine {
+ /**
+ * Initialize Compute job in state machine. This job should have status
{@link JobState.SUBMITTED}.
+ *
+ * @return Compute job identifier.
+ */
+ UUID initJob();
+
+ /**
+ * Try to transfer Compute Job to complete state.
+ *
+ * @param jobId Compute job identifier.
+ * @throws IllegalJobStateTransition in case when job can't be transferred
to complete state.
+ */
+ void completeJob(UUID jobId);
+
+ /**
+ * Try to transfer Compute Job to execute state.
+ *
+ * @param jobId Compute job identifier.
+ * @throws IllegalJobStateTransition in case when job can't be transferred
to complete state.
Review Comment:
Here and below there's a copy-paste error `complete state`
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/state/IllegalJobStateTransition.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.state;
+
+import static
org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_STATE_TRANSITION_ERR;
+
+import java.util.UUID;
+import org.apache.ignite.compute.JobState;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+
+/**
+ * Throws from Compute Jobs state machine {@link ComputeStateMachine} when job
state transfer is illegal.
+ */
+public class IllegalJobStateTransition extends IgniteInternalException {
+ public IllegalJobStateTransition(UUID jobId) {
+ super(COMPUTE_JOB_STATE_TRANSITION_ERR, "Failed to transfer job state
for not-existed job.");
+ }
+
+ public IllegalJobStateTransition(UUID jobId, JobState prevState, JobState
newState) {
+ super(COMPUTE_JOB_STATE_TRANSITION_ERR, message(jobId, prevState,
newState));
+ }
+
+ static String message(UUID jobId, JobState prevState, JobState newState) {
+ return "Failed to transfer job " + jobId
+ + " from state " + (prevState == null ? "NOT_EXIST" :
prevState)
Review Comment:
Is it even possible that `prevState` will be null?
```suggestion
+ (prevState == null ? " from nonexistent state" : (" from
state " + prevState))
```
##########
modules/compute/src/test/java/org/apache/ignite/internal/compute/state/InMemoryComputeStateMachineTest.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.state;
+
+import static org.apache.ignite.compute.JobState.CANCELED;
+import static org.apache.ignite.compute.JobState.CANCELING;
+import static org.apache.ignite.compute.JobState.COMPLETED;
+import static org.apache.ignite.compute.JobState.EXECUTING;
+import static org.apache.ignite.compute.JobState.FAILED;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.UUID;
+import org.hamcrest.Matchers;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test suite for {@link InMemoryComputeStateMachine}.
+ */
+public class InMemoryComputeStateMachineTest {
+ private ComputeStateMachine stateMachine;
+
+ private UUID jobId;
+
+ @BeforeEach
+ public void setup() {
+ stateMachine = new InMemoryComputeStateMachine();
+ jobId = stateMachine.initJob();
+ }
+
+ @Test
+ public void testSubmit() {
+ assertThat(jobId, is(notNullValue()));
+ }
+
+ @Test
+ public void testCompleteWay() {
+ executeJob(false);
+ completeJob(false);
+ }
+
+ @Test
+ public void testCompleteWayWithoutQueue() {
+ executeJob(false);
+ completeJob(false);
+ }
Review Comment:
These two tests are identical.
##########
modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java:
##########
@@ -143,9 +147,13 @@ void setUp() {
willCompleteSuccessfully()
);
+ computeExecutor = new ComputeExecutorImpl(ignite, new
InMemoryComputeStateMachine(), computeConfiguration);
- computeExecutor = new ComputeExecutorImpl(ignite,
computeConfiguration);
- computeComponent = new ComputeComponentImpl(messagingService,
jobContextManager, computeExecutor);
+ computeComponent = new ComputeComponentImpl(
Review Comment:
This line was even shorter than the previous one but for some reason the
parameters are on the separate lines now and in the previous statement they are
on the same line?
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/state/IllegalJobStateTransition.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.state;
+
+import static
org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_STATE_TRANSITION_ERR;
+
+import java.util.UUID;
+import org.apache.ignite.compute.JobState;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+
+/**
+ * Throws from Compute Jobs state machine {@link ComputeStateMachine} when job
state transfer is illegal.
Review Comment:
```suggestion
* Thrown from Compute Jobs state machine {@link ComputeStateMachine} when
job state transfer is illegal.
```
##########
modules/compute/src/main/java/org/apache/ignite/internal/compute/state/ComputeStateMachine.java:
##########
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute.state;
+
+import java.util.UUID;
+import org.apache.ignite.compute.JobState;
+
+/**
+ * State machine of Compute Jobs.
+ */
+public interface ComputeStateMachine {
+ /**
+ * Initialize Compute job in state machine. This job should have status
{@link JobState.SUBMITTED}.
+ *
+ * @return Compute job identifier.
+ */
+ UUID initJob();
+
+ /**
+ * Try to transfer Compute Job to complete state.
+ *
+ * @param jobId Compute job identifier.
+ * @throws IllegalJobStateTransition in case when job can't be transferred
to complete state.
+ */
+ void completeJob(UUID jobId);
+
+ /**
+ * Try to transfer Compute Job to execute state.
+ *
+ * @param jobId Compute job identifier.
+ * @throws IllegalJobStateTransition in case when job can't be transferred
to complete state.
+ */
+ void executeJob(UUID jobId);
+
+ /**
+ * Try to transfer Compute Job to cancel state.
+ *
+ * @param jobId Compute job identifier.
+ * @throws IllegalJobStateTransition in case when job can't be transferred
to complete state.
+ */
+ boolean cancelJob(UUID jobId);
Review Comment:
What is the meaning of the return value of this method?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]