sk0x50 commented on a change in pull request #118: URL: https://github.com/apache/ignite-3/pull/118#discussion_r642939296
########## File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/MultiRowsResponse.java ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.command.response; + +import java.io.Serializable; +import java.util.Collection; +import java.util.HashSet; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.table.distributed.command.CommandUtils; +import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand; +import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand; +import org.apache.ignite.internal.table.distributed.command.GetAllCommand; +import org.apache.ignite.internal.table.distributed.command.InsertAllCommand; + +/** + * It is a response object to return a collection from the batch operation. + * @see GetAllCommand + * @see DeleteAllCommand + * @see InsertAllCommand + * @see DeleteExactAllCommand + */ +public class MultiRowsResponse implements Serializable { + /** Row. */ Review comment: Collection of binary rows. ########## File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/MultiRowsResponse.java ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.command.response; + +import java.io.Serializable; +import java.util.Collection; +import java.util.HashSet; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.table.distributed.command.CommandUtils; +import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand; +import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand; +import org.apache.ignite.internal.table.distributed.command.GetAllCommand; +import org.apache.ignite.internal.table.distributed.command.InsertAllCommand; + +/** + * It is a response object to return a collection from the batch operation. Review comment: This class represents a response object that contains a collection {@link BinaryRow} from a batch operation. ########## File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/MultiRowsResponse.java ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.command.response; + +import java.io.Serializable; +import java.util.Collection; +import java.util.HashSet; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.table.distributed.command.CommandUtils; +import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand; +import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand; +import org.apache.ignite.internal.table.distributed.command.GetAllCommand; +import org.apache.ignite.internal.table.distributed.command.InsertAllCommand; + +/** + * It is a response object to return a collection from the batch operation. + * @see GetAllCommand + * @see DeleteAllCommand + * @see InsertAllCommand + * @see DeleteExactAllCommand + */ +public class MultiRowsResponse implements Serializable { + /** Row. */ + private transient Collection<BinaryRow> rows; + + /* + * Row bytes. + * It is a temporary solution, before network have not implement correct serialization BinaryRow. + * TODO: Remove the field after (IGNITE-14793). + */ + private byte[] rowsBytes; + + /** + * @param rows Rows. + */ + public MultiRowsResponse(Collection<BinaryRow> rows) { + this.rows = rows; + + CommandUtils.rowsToBytes(rows, bytes -> rowsBytes = bytes); + } + + /** + * @return Data row. Review comment: Collection of binary rows || Binary rows. ########## File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/CommandUtils.java ########## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.command; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.function.Consumer; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.ByteBufferRow; +import org.apache.ignite.lang.IgniteLogger; + +/** + * This is an utility class for serialization cache tuples. It will be removed after another way for serialization is + * implemented into the network layer. + * TODO: Remove it after (IGNITE-14793) + */ +public class CommandUtils { + /** The logger. */ + private static final IgniteLogger LOG = IgniteLogger.forClass(CommandUtils.class); + + /** + * Writes a list of rows to byte array. + * + * @param rows List of rows. Review comment: Collection of rows. ########## File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/SingleRowResponse.java ########## @@ -17,63 +17,40 @@ package org.apache.ignite.internal.table.distributed.command.response; -import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.io.Serializable; -import java.util.function.Consumer; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.ByteBufferRow; +import org.apache.ignite.internal.table.distributed.command.CommandUtils; +import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand; +import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand; +import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand; import org.apache.ignite.internal.table.distributed.command.GetCommand; -import org.apache.ignite.lang.IgniteLogger; /** - * It is a response object for handling a table get command. + * It is a response object to return a row from the single operation. Review comment: This class represents a response object message that contains a single binary row. ########## File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/MultiRowsResponse.java ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.command.response; + +import java.io.Serializable; +import java.util.Collection; +import java.util.HashSet; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.table.distributed.command.CommandUtils; +import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand; +import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand; +import org.apache.ignite.internal.table.distributed.command.GetAllCommand; +import org.apache.ignite.internal.table.distributed.command.InsertAllCommand; + +/** + * It is a response object to return a collection from the batch operation. + * @see GetAllCommand + * @see DeleteAllCommand + * @see InsertAllCommand + * @see DeleteExactAllCommand + */ +public class MultiRowsResponse implements Serializable { + /** Row. */ + private transient Collection<BinaryRow> rows; + + /* + * Row bytes. + * It is a temporary solution, before network have not implement correct serialization BinaryRow. + * TODO: Remove the field after (IGNITE-14793). + */ + private byte[] rowsBytes; + + /** + * @param rows Rows. Review comment: ``` Creates a new instance of MultiRowsResponse with the given collection of binary rows. @param rows Collection of binary rows to be returned. (or Binary rows - it is up to you) ``` ########## File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/SingleRowResponse.java ########## @@ -17,63 +17,40 @@ package org.apache.ignite.internal.table.distributed.command.response; -import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.io.Serializable; -import java.util.function.Consumer; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.ByteBufferRow; +import org.apache.ignite.internal.table.distributed.command.CommandUtils; +import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand; +import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand; +import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand; import org.apache.ignite.internal.table.distributed.command.GetCommand; -import org.apache.ignite.lang.IgniteLogger; /** - * It is a response object for handling a table get command. + * It is a response object to return a row from the single operation. + * @see GetCommand + * @see GetAndDeleteCommand + * @see GetAndUpsertCommand + * @see GetAndReplaceCommand */ -public class KVGetResponse implements Serializable { - /** The logger. */ - private static final IgniteLogger LOG = IgniteLogger.forClass(GetCommand.class); - +public class SingleRowResponse implements Serializable { /** Row. */ private transient BinaryRow row; /* * Row bytes. * It is a temporary solution, before network have not implement correct serialization BinaryRow. - * TODO: Remove the field after. + * TODO: Remove the field after (IGNITE-14793). */ private byte[] rowBytes; - public KVGetResponse(BinaryRow row) { - this.row = row; - - rowToBytes(row, bytes -> rowBytes = bytes); - } - /** - * Writes a row to byte array. - * * @param row Row. - * @param consumer Byte array consumer. */ - private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) { - if (row == null) { - consumer.accept(null); - - return; - } - - try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { - row.writeTo(baos); - - baos.flush(); - - consumer.accept(baos.toByteArray()); - } - catch (IOException e) { - LOG.error("Could not write row to stream [row=" + row + ']', e); + public SingleRowResponse(BinaryRow row) { Review comment: ``` Creates a new instance of SingleRowResponse with the given binary row. @param row Binary row to be returned. || Binary row. ``` ########## File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteAllCommand.java ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.command; + +import java.util.HashSet; +import java.util.Set; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.raft.client.WriteCommand; + +/** + * The command deletes a batch rows. + */ +public class DeleteAllCommand implements WriteCommand { + /** Rows. */ + private transient Set<BinaryRow> rows; + + /* + * Row bytes. + * It is a temporary solution, before network have not implement correct serialization BinaryRow. + * TODO: Remove the field after (IGNITE-14793). + */ + private byte[] rowsBytes; + + /** + * @param rows Rows. + */ + public DeleteAllCommand(Set<BinaryRow> rows) { Review comment: @NotNull ########## File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteExactAllCommand.java ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.command; + +import java.util.HashSet; +import java.util.Set; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.raft.client.WriteCommand; + +/** + * The command deletes a batch rows. + */ +public class DeleteExactAllCommand implements WriteCommand { + /** Rows. */ + private transient Set<BinaryRow> rows; + + /* + * Row bytes. + * It is a temporary solution, before network have not implement correct serialization BinaryRow. + * TODO: Remove the field after (IGNITE-14793). + */ + private byte[] rowsBytes; + + /** + * @param rows Rows. + */ + public DeleteExactAllCommand(Set<BinaryRow> rows) { + assert rows != null && !rows.isEmpty(); + + this.rows = rows; + + CommandUtils.rowsToBytes(rows, bytes -> rowsBytes = bytes); + } + + /** + * Gets a list of keys which will used in the command. Review comment: Returns a set of binary rows (keys) to be deleted. ########## File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteExactAllCommand.java ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.command; + +import java.util.HashSet; +import java.util.Set; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.raft.client.WriteCommand; + +/** + * The command deletes a batch rows. Review comment: It would be nice to highlight the difference between this command and DeleteAllCommand (if I'm not mistaken, the last one uses only key in order to determine rows to be deleted) ########## File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteExactAllCommand.java ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.command; + +import java.util.HashSet; +import java.util.Set; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.raft.client.WriteCommand; + +/** + * The command deletes a batch rows. + */ +public class DeleteExactAllCommand implements WriteCommand { + /** Rows. */ + private transient Set<BinaryRow> rows; + + /* + * Row bytes. + * It is a temporary solution, before network have not implement correct serialization BinaryRow. + * TODO: Remove the field after (IGNITE-14793). + */ + private byte[] rowsBytes; + + /** + * @param rows Rows. Review comment: Creates a new instance of DeleteAllCommand with the given set of rows to be deleted. {code rows} should not be {@code null} or empty. @param rows - Collection of binary rows to be deleted. ########## File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteAllCommand.java ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.command; + +import java.util.HashSet; +import java.util.Set; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.raft.client.WriteCommand; + +/** + * The command deletes a batch rows. + */ +public class DeleteAllCommand implements WriteCommand { + /** Rows. */ + private transient Set<BinaryRow> rows; + + /* + * Row bytes. + * It is a temporary solution, before network have not implement correct serialization BinaryRow. + * TODO: Remove the field after (IGNITE-14793). + */ + private byte[] rowsBytes; + + /** + * @param rows Rows. + */ + public DeleteAllCommand(Set<BinaryRow> rows) { + assert rows != null && !rows.isEmpty(); + + this.rows = rows; + + CommandUtils.rowsToBytes(rows, bytes -> rowsBytes = bytes); + } + + /** + * Gets a list of keys which will used in the command. Review comment: Returns a set of binary rows (keys) to be deleted. ########## File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteExactCommand.java ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.command; + +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.ByteBufferRow; +import org.apache.ignite.raft.client.WriteCommand; +import org.jetbrains.annotations.NotNull; + +/** + * The command deletes a entry by passed key. Review comment: It would be nice to mention the difference between this command and DeleteCommand. ########## File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteAllCommand.java ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.command; + +import java.util.HashSet; +import java.util.Set; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.raft.client.WriteCommand; + +/** + * The command deletes a batch rows. + */ +public class DeleteAllCommand implements WriteCommand { + /** Rows. */ + private transient Set<BinaryRow> rows; + + /* + * Row bytes. + * It is a temporary solution, before network have not implement correct serialization BinaryRow. + * TODO: Remove the field after (IGNITE-14793). + */ + private byte[] rowsBytes; + + /** + * @param rows Rows. + */ + public DeleteAllCommand(Set<BinaryRow> rows) { Review comment: ``` Creates a new instance of DeleteAllCommand with the given set of rows to be deleted. {code rows} should not be {@code null} or empty. @param rows -Collection of binary rows to be deleted. ``` ########## File path: modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java ########## @@ -0,0 +1,699 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.raft; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.UUID; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.ByteBufferRow; +import org.apache.ignite.internal.schema.Column; +import org.apache.ignite.internal.schema.NativeTypes; +import org.apache.ignite.internal.schema.Row; +import org.apache.ignite.internal.schema.RowAssembler; +import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand; +import org.apache.ignite.internal.table.distributed.command.DeleteCommand; +import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand; +import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand; +import org.apache.ignite.internal.table.distributed.command.GetAllCommand; +import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand; +import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand; +import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand; +import org.apache.ignite.internal.table.distributed.command.GetCommand; +import org.apache.ignite.internal.table.distributed.command.InsertAllCommand; +import org.apache.ignite.internal.table.distributed.command.InsertCommand; +import org.apache.ignite.internal.table.distributed.command.ReplaceCommand; +import org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand; +import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand; +import org.apache.ignite.internal.table.distributed.command.UpsertCommand; +import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse; +import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse; +import org.apache.ignite.raft.client.Command; +import org.apache.ignite.raft.client.service.CommandClosure; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * There are a tests for a table command listener. + */ +public class PartitionCommandListenerTest { + /** Key count. */ + public static final int KEY_COUNT = 100; + + /** Schema. */ + public static SchemaDescriptor SCHEMA = new SchemaDescriptor(UUID.randomUUID(), + 1, + new Column[] {new Column("key", NativeTypes.INTEGER, false)}, + new Column[] {new Column("value", NativeTypes.INTEGER, false)} + ); + + /** Table command listener. */ + private static PartitionListener commandListener; + + /** + * Inisializes a table listener before tests. Review comment: initializes ########## File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteExactAllCommand.java ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.command; + +import java.util.HashSet; +import java.util.Set; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.raft.client.WriteCommand; + +/** + * The command deletes a batch rows. + */ +public class DeleteExactAllCommand implements WriteCommand { + /** Rows. */ + private transient Set<BinaryRow> rows; + + /* + * Row bytes. + * It is a temporary solution, before network have not implement correct serialization BinaryRow. + * TODO: Remove the field after (IGNITE-14793). + */ + private byte[] rowsBytes; + + /** + * @param rows Rows. + */ + public DeleteExactAllCommand(Set<BinaryRow> rows) { Review comment: @NotNull ########## File path: modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java ########## @@ -0,0 +1,699 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.raft; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.UUID; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.ByteBufferRow; +import org.apache.ignite.internal.schema.Column; +import org.apache.ignite.internal.schema.NativeTypes; +import org.apache.ignite.internal.schema.Row; +import org.apache.ignite.internal.schema.RowAssembler; +import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand; +import org.apache.ignite.internal.table.distributed.command.DeleteCommand; +import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand; +import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand; +import org.apache.ignite.internal.table.distributed.command.GetAllCommand; +import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand; +import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand; +import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand; +import org.apache.ignite.internal.table.distributed.command.GetCommand; +import org.apache.ignite.internal.table.distributed.command.InsertAllCommand; +import org.apache.ignite.internal.table.distributed.command.InsertCommand; +import org.apache.ignite.internal.table.distributed.command.ReplaceCommand; +import org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand; +import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand; +import org.apache.ignite.internal.table.distributed.command.UpsertCommand; +import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse; +import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse; +import org.apache.ignite.raft.client.Command; +import org.apache.ignite.raft.client.service.CommandClosure; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * There are a tests for a table command listener. + */ +public class PartitionCommandListenerTest { + /** Key count. */ + public static final int KEY_COUNT = 100; + + /** Schema. */ + public static SchemaDescriptor SCHEMA = new SchemaDescriptor(UUID.randomUUID(), + 1, + new Column[] {new Column("key", NativeTypes.INTEGER, false)}, + new Column[] {new Column("value", NativeTypes.INTEGER, false)} + ); + + /** Table command listener. */ + private static PartitionListener commandListener; + + /** + * Inisializes a table listener before tests. + */ + @BeforeAll + public static void before() { + commandListener = new PartitionListener(); + } + + /** + * Insrets rows and checks them. + * All rows remove before return. Review comment: All rows are removed before returning. (here and below) ########## File path: modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteAllCommand.java ########## @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.command; + +import java.util.HashSet; +import java.util.Set; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.raft.client.WriteCommand; + +/** + * The command deletes a batch rows. + */ +public class DeleteAllCommand implements WriteCommand { + /** Rows. */ + private transient Set<BinaryRow> rows; + + /* + * Row bytes. + * It is a temporary solution, before network have not implement correct serialization BinaryRow. + * TODO: Remove the field after (IGNITE-14793). + */ + private byte[] rowsBytes; + + /** + * @param rows Rows. + */ + public DeleteAllCommand(Set<BinaryRow> rows) { Review comment: Please update all commands in the same way (where it is appropriate). ########## File path: modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java ########## @@ -0,0 +1,699 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.raft; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.UUID; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.ByteBufferRow; +import org.apache.ignite.internal.schema.Column; +import org.apache.ignite.internal.schema.NativeTypes; +import org.apache.ignite.internal.schema.Row; +import org.apache.ignite.internal.schema.RowAssembler; +import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand; +import org.apache.ignite.internal.table.distributed.command.DeleteCommand; +import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand; +import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand; +import org.apache.ignite.internal.table.distributed.command.GetAllCommand; +import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand; +import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand; +import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand; +import org.apache.ignite.internal.table.distributed.command.GetCommand; +import org.apache.ignite.internal.table.distributed.command.InsertAllCommand; +import org.apache.ignite.internal.table.distributed.command.InsertCommand; +import org.apache.ignite.internal.table.distributed.command.ReplaceCommand; +import org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand; +import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand; +import org.apache.ignite.internal.table.distributed.command.UpsertCommand; +import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse; +import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse; +import org.apache.ignite.raft.client.Command; +import org.apache.ignite.raft.client.service.CommandClosure; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * There are a tests for a table command listener. + */ +public class PartitionCommandListenerTest { + /** Key count. */ + public static final int KEY_COUNT = 100; + + /** Schema. */ + public static SchemaDescriptor SCHEMA = new SchemaDescriptor(UUID.randomUUID(), + 1, + new Column[] {new Column("key", NativeTypes.INTEGER, false)}, + new Column[] {new Column("value", NativeTypes.INTEGER, false)} + ); + + /** Table command listener. */ + private static PartitionListener commandListener; + + /** + * Inisializes a table listener before tests. + */ + @BeforeAll + public static void before() { + commandListener = new PartitionListener(); + } + + /** + * Insrets rows and checks them. Review comment: Inserts ########## File path: modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java ########## @@ -0,0 +1,699 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.raft; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.UUID; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.schema.ByteBufferRow; +import org.apache.ignite.internal.schema.Column; +import org.apache.ignite.internal.schema.NativeTypes; +import org.apache.ignite.internal.schema.Row; +import org.apache.ignite.internal.schema.RowAssembler; +import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand; +import org.apache.ignite.internal.table.distributed.command.DeleteCommand; +import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand; +import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand; +import org.apache.ignite.internal.table.distributed.command.GetAllCommand; +import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand; +import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand; +import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand; +import org.apache.ignite.internal.table.distributed.command.GetCommand; +import org.apache.ignite.internal.table.distributed.command.InsertAllCommand; +import org.apache.ignite.internal.table.distributed.command.InsertCommand; +import org.apache.ignite.internal.table.distributed.command.ReplaceCommand; +import org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand; +import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand; +import org.apache.ignite.internal.table.distributed.command.UpsertCommand; +import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse; +import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse; +import org.apache.ignite.raft.client.Command; +import org.apache.ignite.raft.client.service.CommandClosure; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * There are a tests for a table command listener. + */ +public class PartitionCommandListenerTest { + /** Key count. */ + public static final int KEY_COUNT = 100; + + /** Schema. */ + public static SchemaDescriptor SCHEMA = new SchemaDescriptor(UUID.randomUUID(), + 1, + new Column[] {new Column("key", NativeTypes.INTEGER, false)}, + new Column[] {new Column("value", NativeTypes.INTEGER, false)} + ); + + /** Table command listener. */ + private static PartitionListener commandListener; + + /** + * Inisializes a table listener before tests. + */ + @BeforeAll + public static void before() { + commandListener = new PartitionListener(); + } + + /** + * Insrets rows and checks them. + * All rows remove before return. + */ + @Test + public void testInsertCommands() { + readAndCheck(false); + + delete(false); + + insert(false); + + insert(true); + + readAndCheck(true); + + delete(true); + } + + /** + * Upserts rows and checks them. + * All rows remove before return. + */ + @Test + public void testUpsertValues() { + readAndCheck(false); + + upsert(); + + readAndCheck(true); + + delete(true); + + readAndCheck(false); + } + + /** + * Adds rows, replaces and checks them. + * All rows remove before return. + */ + @Test + public void testReplaceCommand() { + upsert(); + + deleteExactValues(false); + + replaceValues(true); + + readAndCheck(true, i -> i + 1); + + replaceValues(false); + + readAndCheck(true, i -> i + 1); + + deleteExactValues(true); + + readAndCheck(false); + } + + /** + * The test checks PutIfExist command. + * All rows remove before return. + */ + @Test + public void testPutIfExistCommand() { + putIfExistValues(false); + + readAndCheck(false); + + upsert(); + + putIfExistValues(true); + + readAndCheck(true, i -> i + 1); + + getAndDeleteValues(true); + + readAndCheck(false); + + getAndDeleteValues(false); + } + + /** + * The test checks GetAndReplace command. + * All rows remove before return. + */ + @Test + public void testGetAndReplaceCommand() { + readAndCheck(false); + + getAndUpsertValues(false); + + readAndCheck(true); + + getAndReplaceValues(true); + + readAndCheck(true, i -> i + 1); + + getAndUpsertValues(true); + + readAndCheck(true); + + deleteExactAllValues(true); + + readAndCheck(false); + + getAndReplaceValues(false); + + deleteExactAllValues(false); + } + + /** + * The test checks a batch upsert command. + * All rows remove before return. + */ + @Test + public void testUpsertRowsBatchedAndCheck() { + readAll(false); + + deleteAll(false); + + upsertAll(); + + readAll(true); + + deleteAll(true); + + readAll(false); + } + + /** + * The test checks a batch insert command. + * All rows remove before return. + */ + @Test + public void testInsertRowsBatchedAndCheck() { + readAll(false); + + deleteAll(false); + + insertAll(false); + + readAll(true); + + insertAll(true); + + deleteAll(true); + + readAll(false); + } + + /** + * Prepares a closure iterator for a specific batch operation. + * + * @param func The function prepare a closure for the operation. + * @param <T> Type of the operation. + * @return Closure iterator. + */ + private <T extends Command> Iterator<CommandClosure<T>> batchIterator(Consumer<CommandClosure<T>> func) { + return new Iterator<CommandClosure<T>>() { + boolean moved; + + @Override public boolean hasNext() { + return !moved; + } + + @Override public CommandClosure<T> next() { + CommandClosure<T> clo = mock(CommandClosure.class); + + func.accept(clo); + + moved = true; + + return clo; + } + }; + } + + /** + * Prepares a closure iterator for a specific operation. + * + * @param func The function prepare a closure for the operation. + * @param <T> Type of the operation. + * @return Closure iterator. + */ + private <T extends Command> Iterator<CommandClosure<T>> iterator(BiConsumer<Integer, CommandClosure<T>> func) { + return new Iterator<CommandClosure<T>>() { + /** Iteration. */ + private int i = 0; + + /** {@inheritDoc} */ + @Override public boolean hasNext() { + return i < KEY_COUNT; + } + + /** {@inheritDoc} */ + @Override public CommandClosure<T> next() { + CommandClosure<T> clo = mock(CommandClosure.class); + + func.accept(i, clo); + + i++; + + return clo; + } + }; + } + + /** + * @param existed True if rows are existed, false otherwise. + */ + private void insertAll(boolean existed) { + commandListener.onWrite(batchIterator(clo -> { + doAnswer(invocation -> { + MultiRowsResponse resp = invocation.getArgument(0); + + if (existed) { + assertEquals(KEY_COUNT, resp.getValues().size()); + + for (BinaryRow binaryRow : resp.getValues()) { + Row row = new Row(SCHEMA, binaryRow); + + int keyVal = row.intValue(0); + + assertTrue(keyVal < KEY_COUNT); + assertEquals(keyVal, row.intValue(1)); + } + } + else + assertTrue(resp.getValues().isEmpty()); + + return null; + }).when(clo).result(any(MultiRowsResponse.class)); + + Set<BinaryRow> rows = new HashSet<>(KEY_COUNT); + + for (int i = 0; i < KEY_COUNT; i++) + rows.add(getTestRow(i, i)); + + when(clo.command()).thenReturn(new InsertAllCommand(rows)); + })); + } + + /** + * Upserts values from the listener in the batch operation. + */ + private void upsertAll() { + commandListener.onWrite(batchIterator(clo -> { + doAnswer(invocation -> { + assertNull(invocation.getArgument(0)); + + return null; + }).when(clo).result(any()); + + Set<BinaryRow> rows = new HashSet<>(KEY_COUNT); + + for (int i = 0; i < KEY_COUNT; i++) + rows.add(getTestRow(i, i)); + + when(clo.command()).thenReturn(new UpsertAllCommand(rows)); + })); + } + + /** + * @param existed True if rows are existed, false otherwise. + */ + private void deleteAll(boolean existed) { + commandListener.onWrite(batchIterator(clo -> { + doAnswer(invocation -> { + MultiRowsResponse resp = invocation.getArgument(0); + + if (existed) { + assertEquals(KEY_COUNT, resp.getValues().size()); + + for (BinaryRow binaryRow : resp.getValues()) { + Row row = new Row(SCHEMA, binaryRow); + + int keyVal = row.intValue(0); + + assertTrue(keyVal < KEY_COUNT); + assertEquals(keyVal, row.intValue(1)); + } + } + else + assertTrue(resp.getValues().isEmpty()); + + return null; + }).when(clo).result(any(MultiRowsResponse.class)); + + Set<BinaryRow> keyRows = new HashSet<>(KEY_COUNT); + + for (int i = 0; i < KEY_COUNT; i++) + keyRows.add(getTestKey(i)); + + when(clo.command()).thenReturn(new DeleteAllCommand(keyRows)); + })); + } + + /** + * @param existed True if rows are existed, false otherwise. + */ + private void readAll(boolean existed) { + commandListener.onRead(batchIterator(clo -> { + doAnswer(invocation -> { + MultiRowsResponse resp = invocation.getArgument(0); + + if (existed) { + assertEquals(KEY_COUNT, resp.getValues().size()); + + for (BinaryRow binaryRow : resp.getValues()) { + Row row = new Row(SCHEMA, binaryRow); + + int keyVal = row.intValue(0); + + assertTrue(keyVal < KEY_COUNT); + assertEquals(keyVal, row.intValue(1)); + } + } + else + assertTrue(resp.getValues().isEmpty()); + + return null; + }).when(clo).result(any(MultiRowsResponse.class)); + + Set<BinaryRow> keyRows = new HashSet<>(KEY_COUNT); + + for (int i = 0; i < KEY_COUNT; i++) + keyRows.add(getTestKey(i)); + + when(clo.command()).thenReturn(new GetAllCommand(keyRows)); + })); + } + + /** + * Upserts rows. + */ + private void upsert() { + commandListener.onWrite(iterator((i, clo) -> { + when(clo.command()).thenReturn(new UpsertCommand(getTestRow(i, i))); + + doAnswer(invocation -> { + assertNull(invocation.getArgument(0)); + + return null; + }).when(clo).result(any()); + })); + } + + /** + * @param existed True if rows are existed, false otherwise. + */ + private void delete(boolean existed) { + commandListener.onWrite(iterator((i, clo) -> { + when(clo.command()).thenReturn(new DeleteCommand(getTestKey(i))); + + doAnswer(invocation -> { + assertEquals(existed, invocation.getArgument(0)); + + return null; + }).when(clo).result(any()); + })); + } + + /** + * Reads rows from the listener and checks them. + * + * @param existed True if rows are existed, false otherwise. + */ + private void readAndCheck(boolean existed) { + readAndCheck(existed, i -> i); + } + + /** + * Reades rows from the listener and checks values as expected by a mapper. Review comment: Reads -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
