isapego commented on a change in pull request #191: URL: https://github.com/apache/ignite-3/pull/191#discussion_r674331522
########## File path: modules/client-common/src/main/java/org/apache/ignite/client/ClientMessageDecoder.java ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.client; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.util.CharsetUtil; +import org.apache.ignite.lang.IgniteException; +import org.msgpack.core.MessageFormat; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; + +/** + * Decodes full client messages: + * 1. MAGIC for first message. + * 2. Payload length (varint). + * 3. Payload (bytes). + */ +public class ClientMessageDecoder extends ByteToMessageDecoder { + /** Magic bytes before handshake. */ + public static final byte[] MAGIC_BYTES = new byte[]{0x49, 0x47, 0x4E, 0x49}; // IGNI + + /** Data buffer. */ + private byte[] data = new byte[4]; // TODO: Pooled buffers IGNITE-15162. + + /** Remaining byte count. */ + private int cnt = -4; + + /** Message size. */ + private int msgSize = -1; + + /** Magic decoded flag. */ + private boolean magicDecoded; + + /** Magic decoding failed flag. */ + private boolean magicFailed; + + /** Message size varint format. */ + private MessageFormat sizeFormat = null; + + /** First byte of the message. */ + private byte firstByte; + + /** {@inheritDoc} */ + @Override protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) { + if (!readMagic(byteBuf)) + return; + + while (read(byteBuf)) + list.add(ByteBuffer.wrap(data)); + } + + /** + * Checks the magic header for the first message. + * + * @param byteBuf Buffer. + * @return True when magic header has been received and is valid, false otherwise. + * @throws IgniteException When magic is invalid. + */ + private boolean readMagic(ByteBuf byteBuf) { + if (magicFailed) + return false; + + if (magicDecoded) + return true; + + if (byteBuf.readableBytes() < MAGIC_BYTES.length) + return false; + + byteBuf.readBytes(data, 0, MAGIC_BYTES.length); + + magicDecoded = true; + cnt = -1; + msgSize = 0; + + if (Arrays.equals(data, MAGIC_BYTES)) + return true; + + magicFailed = true; + + throw new IgniteException("Invalid magic header in thin client connection. " + + "Expected 'IGNI', but was '" + new String(data, CharsetUtil.US_ASCII) + "'."); + } + + /** + * Reads the buffer. + * + * @param buf Buffer. + * @return True when a complete message has been received; false otherwise. + * @throws IgniteException when message is invalid. + */ + private boolean read(ByteBuf buf) { + if (buf.readableBytes() == 0) + return false; + + if (cnt < 0) { + // Read varint message size. + // TODO: Use fixed int message size? This will simplify encoders/decoders a lot, + // likely increasing performance as well. + if (sizeFormat == null) { + firstByte = buf.readByte(); + sizeFormat = MessageFormat.valueOf(firstByte); + } + + switch (sizeFormat) { + case POSFIXINT: + msgSize = firstByte; + break; + + case INT8: Review comment: Why size format can be signed at all? ########## File path: modules/client-common/src/main/java/org/apache/ignite/client/ClientMessageDecoder.java ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.client; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.util.CharsetUtil; +import org.apache.ignite.lang.IgniteException; +import org.msgpack.core.MessageFormat; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; + +/** + * Decodes full client messages: + * 1. MAGIC for first message. + * 2. Payload length (varint). + * 3. Payload (bytes). + */ +public class ClientMessageDecoder extends ByteToMessageDecoder { + /** Magic bytes before handshake. */ + public static final byte[] MAGIC_BYTES = new byte[]{0x49, 0x47, 0x4E, 0x49}; // IGNI + + /** Data buffer. */ + private byte[] data = new byte[4]; // TODO: Pooled buffers IGNITE-15162. + + /** Remaining byte count. */ + private int cnt = -4; + + /** Message size. */ + private int msgSize = -1; + + /** Magic decoded flag. */ + private boolean magicDecoded; + + /** Magic decoding failed flag. */ + private boolean magicFailed; + + /** Message size varint format. */ + private MessageFormat sizeFormat = null; + + /** First byte of the message. */ + private byte firstByte; + + /** {@inheritDoc} */ + @Override protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) { + if (!readMagic(byteBuf)) + return; + + while (read(byteBuf)) + list.add(ByteBuffer.wrap(data)); + } + + /** + * Checks the magic header for the first message. + * + * @param byteBuf Buffer. + * @return True when magic header has been received and is valid, false otherwise. + * @throws IgniteException When magic is invalid. + */ + private boolean readMagic(ByteBuf byteBuf) { + if (magicFailed) + return false; + + if (magicDecoded) + return true; + + if (byteBuf.readableBytes() < MAGIC_BYTES.length) + return false; + + byteBuf.readBytes(data, 0, MAGIC_BYTES.length); + + magicDecoded = true; + cnt = -1; + msgSize = 0; + + if (Arrays.equals(data, MAGIC_BYTES)) + return true; + + magicFailed = true; + + throw new IgniteException("Invalid magic header in thin client connection. " + + "Expected 'IGNI', but was '" + new String(data, CharsetUtil.US_ASCII) + "'."); + } + + /** + * Reads the buffer. + * + * @param buf Buffer. + * @return True when a complete message has been received; false otherwise. + * @throws IgniteException when message is invalid. + */ + private boolean read(ByteBuf buf) { + if (buf.readableBytes() == 0) + return false; + + if (cnt < 0) { + // Read varint message size. + // TODO: Use fixed int message size? This will simplify encoders/decoders a lot, + // likely increasing performance as well. Review comment: I agree. Can not see how variable message size can improve anything. ########## File path: modules/client-common/src/main/java/org/apache/ignite/client/ClientMessageEncoder.java ########## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.client; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; + +import java.nio.ByteBuffer; + +/** + * Encodes client messages: + * 1. MAGIC for first message. + * 2. Payload length (varint). + * 3. Payload (bytes). + */ +public class ClientMessageEncoder extends MessageToByteEncoder<ByteBuffer> { + /** Magic encoded flag. */ + private boolean magicEncoded; + + /** {@inheritDoc} */ + @Override protected void encode(ChannelHandlerContext ctx, ByteBuffer message, ByteBuf out) { + if (!magicEncoded) { + out.writeBytes(ClientMessageDecoder.MAGIC_BYTES); + + magicEncoded = true; + } + + // Encode size without using MessagePacker to reduce indirection and allocations. + int size = message.remaining(); + + if (size <= 0x7f) + out.writeByte(size); + else if (size < 0xff) { + out.writeByte(0xcc); Review comment: Are those numbers declared as constants somewhere? ########## File path: modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java ########## @@ -0,0 +1,557 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.client.handler; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.ignite.app.Ignite; +import org.apache.ignite.client.ClientDataType; +import org.apache.ignite.client.ClientErrorCode; +import org.apache.ignite.client.ClientMessagePacker; +import org.apache.ignite.client.ClientMessageType; +import org.apache.ignite.client.ClientMessageUnpacker; +import org.apache.ignite.client.ClientOp; +import org.apache.ignite.client.ProtocolVersion; +import org.apache.ignite.configuration.schemas.table.TableChange; +import org.apache.ignite.internal.schema.Column; +import org.apache.ignite.internal.schema.NativeTypeSpec; +import org.apache.ignite.internal.schema.SchemaAware; +import org.apache.ignite.internal.schema.SchemaDescriptor; +import org.apache.ignite.internal.table.IgniteTablesInternal; +import org.apache.ignite.internal.table.TableImpl; +import org.apache.ignite.internal.table.TupleBuilderImpl; +import org.apache.ignite.lang.IgniteException; +import org.apache.ignite.table.Table; +import org.apache.ignite.table.Tuple; +import org.msgpack.core.MessageFormat; +import org.msgpack.core.buffer.ByteBufferInput; +import org.slf4j.Logger; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.BitSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +/** + * Handles messages from thin clients. + */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class ClientInboundMessageHandler extends ChannelInboundHandlerAdapter { + /** Logger. */ + private final Logger log; + + /** API entry point. */ + private final Ignite ignite; + + /** Context. */ + private ClientContext clientContext; + + /** + * Constructor. + * + * @param ignite Ignite API entry point. + * @param log Logger. + */ + public ClientInboundMessageHandler(Ignite ignite, Logger log) { + assert ignite != null; + assert log != null; + + this.ignite = ignite; + this.log = log; + } + + /** {@inheritDoc} */ + @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException { + var buf = (ByteBuffer) msg; + + var unpacker = getUnpacker(buf); + var packer = getPacker(); + + if (clientContext == null) + handshake(ctx, unpacker, packer); + else + processOperation(ctx, unpacker, packer); + } + + private void handshake(ChannelHandlerContext ctx, ClientMessageUnpacker unpacker, ClientMessagePacker packer) + throws IOException { + try { + var clientVer = ProtocolVersion.unpack(unpacker); + + if (!clientVer.equals(ProtocolVersion.LATEST_VER)) + throw new IgniteException("Unsupported version: " + + clientVer.major() + "." + clientVer.minor() + "." + clientVer.patch()); + + var clientCode = unpacker.unpackInt(); + var featuresLen = unpacker.unpackBinaryHeader(); + var features = BitSet.valueOf(unpacker.readPayload(featuresLen)); + + clientContext = new ClientContext(clientVer, clientCode, features); + + log.debug("Handshake: " + clientContext); + + var extensionsLen = unpacker.unpackMapHeader(); + unpacker.skipValue(extensionsLen); + + // Response. + ProtocolVersion.LATEST_VER.pack(packer); + + packer.packInt(ClientErrorCode.SUCCESS) + .packBinaryHeader(0) // Features. + .packMapHeader(0); // Extensions. + + write(packer, ctx); + } catch (Throwable t) { + packer = getPacker(); + + ProtocolVersion.LATEST_VER.pack(packer); + packer.packInt(ClientErrorCode.FAILED).packString(t.getMessage()); + + write(packer, ctx); + } + } + + private void write(ClientMessagePacker packer, ChannelHandlerContext ctx) { + var buf = packer.toMessageBuffer().sliceAsByteBuffer(); + + ctx.writeAndFlush(buf); + } + + private void writeError(int requestId, Throwable err, ChannelHandlerContext ctx) { + try { + assert err != null; + + ClientMessagePacker packer = getPacker(); + packer.packInt(ClientMessageType.RESPONSE); + packer.packInt(requestId); + packer.packInt(ClientErrorCode.FAILED); + + String msg = err.getMessage(); + + if (msg == null) + msg = err.getClass().getName(); + + packer.packString(msg); + + write(packer, ctx); + } catch (Throwable t) { + exceptionCaught(ctx, t); + } + } + + private ClientMessagePacker getPacker() { + return new ClientMessagePacker(); + } + + private ClientMessageUnpacker getUnpacker(ByteBuffer buf) { + return new ClientMessageUnpacker(new ByteBufferInput(buf)); + } + + private void processOperation(ChannelHandlerContext ctx, ClientMessageUnpacker unpacker, ClientMessagePacker packer) throws IOException { + var opCode = unpacker.unpackInt(); + var requestId = unpacker.unpackInt(); + + packer.packInt(ClientMessageType.RESPONSE) + .packInt(requestId) + .packInt(ClientErrorCode.SUCCESS); + + try { + var fut = processOperation(unpacker, packer, opCode); + + if (fut == null) { + // Operation completed synchronously. + write(packer, ctx); + } else { + fut.whenComplete((Object res, Object err) -> { + if (err != null) + writeError(requestId, (Throwable) err, ctx); + else + write(packer, ctx); + }); + } + + } catch (Throwable t) { + writeError(requestId, t, ctx); + } + } + + private CompletableFuture processOperation(ClientMessageUnpacker unpacker, ClientMessagePacker packer, int opCode) Review comment: I think this file may need a re-factoring in future. ########## File path: modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.client; + +import org.apache.ignite.app.Ignite; +import org.apache.ignite.internal.client.TcpIgniteClient; + +import java.util.concurrent.CompletableFuture; + +/** + * Ignite client entry point. + */ +public class IgniteClient { + /** + * Gets a new client builder. + * + * @return New client builder. + */ + public static Builder builder() { + return new Builder(); + } + + /** Client builder. */ + public static class Builder { + /** Addresses. */ + private String[] addresses; + + /** + * Builds the client. + * + * @return Ignite client. + */ + public Ignite build() { Review comment: I don't really like the fact that we return `Ignite` instance. I understand, this is an interface, but I think that maybe it's something we should change to reflect the fact that this is only a link to a cluster. ########## File path: modules/client-common/src/main/java/org/apache/ignite/client/ClientMessageUnpacker.java ########## @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.client; + +import org.apache.ignite.lang.IgniteException; +import org.msgpack.core.MessagePack; +import org.msgpack.core.MessageSizeException; +import org.msgpack.core.MessageTypeException; +import org.msgpack.core.MessageUnpacker; +import org.msgpack.core.buffer.MessageBufferInput; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.BitSet; +import java.util.UUID; + +import static org.apache.ignite.client.ClientDataType.BITMASK; +import static org.apache.ignite.client.ClientDataType.BYTES; +import static org.apache.ignite.client.ClientDataType.DECIMAL; +import static org.apache.ignite.client.ClientDataType.DOUBLE; +import static org.apache.ignite.client.ClientDataType.FLOAT; +import static org.apache.ignite.client.ClientDataType.INT16; +import static org.apache.ignite.client.ClientDataType.INT32; +import static org.apache.ignite.client.ClientDataType.INT64; +import static org.apache.ignite.client.ClientDataType.INT8; +import static org.apache.ignite.client.ClientDataType.STRING; + +/** + * Ignite-specific MsgPack extension. + */ +public class ClientMessageUnpacker extends MessageUnpacker { + /** + * Constructor. + * + * @param in Input. + */ + public ClientMessageUnpacker(MessageBufferInput in) { + super(in, MessagePack.DEFAULT_UNPACKER_CONFIG); + } + + /** + * Reads an UUID. + * + * @return UUID value. + * @throws IOException when underlying input throws IOException. + * @throws MessageTypeException when type is not UUID. + * @throws MessageSizeException when size is not correct. + */ + public UUID unpackUuid() throws IOException { + var hdr = unpackExtensionTypeHeader(); + var type = hdr.getType(); + var len = hdr.getLength(); Review comment: Length looks redundant here. Can we somehow avoid writing it? ########## File path: modules/client/src/main/java/org/apache/ignite/client/IgniteClient.java ########## @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.client; + +import org.apache.ignite.app.Ignite; +import org.apache.ignite.internal.client.TcpIgniteClient; + +import java.util.concurrent.CompletableFuture; + +/** + * Ignite client entry point. + */ +public class IgniteClient { Review comment: Should not it be `static`? ########## File path: modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java ########## @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client.table; + +import org.apache.ignite.client.ClientMessageUnpacker; +import org.apache.ignite.client.ClientOp; +import org.apache.ignite.client.IgniteClientException; +import org.apache.ignite.internal.client.PayloadInputChannel; +import org.apache.ignite.internal.client.PayloadOutputChannel; +import org.apache.ignite.internal.client.ReliableChannel; +import org.apache.ignite.internal.tostring.IgniteToStringBuilder; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.table.InvokeProcessor; +import org.apache.ignite.table.KeyValueBinaryView; +import org.apache.ignite.table.KeyValueView; +import org.apache.ignite.table.RecordView; +import org.apache.ignite.table.Table; +import org.apache.ignite.table.Tuple; +import org.apache.ignite.table.TupleBuilder; +import org.apache.ignite.table.mapper.KeyMapper; +import org.apache.ignite.table.mapper.RecordMapper; +import org.apache.ignite.table.mapper.ValueMapper; +import org.apache.ignite.tx.Transaction; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.msgpack.core.MessageFormat; + +import java.io.IOException; +import java.io.Serializable; +import java.util.Collection; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Client table API implementation. + */ +public class ClientTable implements Table { + /** */ + private final UUID id; + + /** */ + private final String name; + + /** */ + private final ReliableChannel ch; + + /** */ + private final ConcurrentHashMap<Integer, ClientSchema> schemas = new ConcurrentHashMap<>(); + + /** */ + private volatile int latestSchemaVer = -1; + + /** */ + private final Object latestSchemaLock = new Object(); + + /** + * Constructor. + * + * @param ch Channel. + * @param id Table id. + * @param name Table name. + */ + public ClientTable(ReliableChannel ch, UUID id, String name) { + assert ch != null; + assert id != null; + assert name != null && !name.isEmpty(); + + this.ch = ch; + this.id = id; + this.name = name; + } + + /** + * Gets the table id. + * + * @return Table id. + */ + public UUID tableId() { + return id; + } + + /** {@inheritDoc} */ + @Override public @NotNull String tableName() { + return name; + } + + /** {@inheritDoc} */ + @Override public <R> RecordView<R> recordView(RecordMapper<R> recMapper) { + return null; + } + + /** {@inheritDoc} */ + @Override public <K, V> KeyValueView<K, V> kvView(KeyMapper<K> keyMapper, ValueMapper<V> valMapper) { + return null; + } + + /** {@inheritDoc} */ + @Override public KeyValueBinaryView kvView() { + return null; + } + + /** {@inheritDoc} */ + @Override public Table withTransaction(Transaction tx) { + return null; + } + + /** {@inheritDoc} */ + @Override public TupleBuilder tupleBuilder() { + return new ClientTupleBuilder(); + } + + /** {@inheritDoc} */ + @Override public Tuple get(@NotNull Tuple keyRec) { + return getAsync(keyRec).join(); + } + + /** {@inheritDoc} */ + @Override public @NotNull CompletableFuture<Tuple> getAsync(@NotNull Tuple keyRec) { + return getLatestSchema().thenCompose(schema -> + ch.serviceAsync(ClientOp.TUPLE_GET, w -> writeTuple(keyRec, schema, w, true), r -> { + if (r.in().getNextFormat() == MessageFormat.NIL) + return null; + + var schemaVer = r.in().unpackInt(); + + return new IgniteBiTuple<>(r, schemaVer); + })).thenCompose(biTuple -> { + if (biTuple == null) + return CompletableFuture.completedFuture(null); + + assert biTuple.getKey() != null; + assert biTuple.getValue() != null; + + return getSchema(biTuple.getValue()).thenApply(schema -> readTuple(schema, biTuple.getKey())); + }); + } + + /** {@inheritDoc} */ + @Override public Collection<Tuple> getAll(@NotNull Collection<Tuple> keyRecs) { + return null; Review comment: Should we throw some kind of unimplemented exception in such places instead of just returning null? ########## File path: modules/client/src/main/java/org/apache/ignite/internal/client/HostAndPortRange.java ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.client; + +import java.io.Serializable; +import java.net.Inet6Address; +import java.net.UnknownHostException; +import org.apache.ignite.lang.IgniteException; + +/** + * Represents address along with port range. + */ +public class HostAndPortRange implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** Host. */ + private final String host; + + /** Port from. */ + private final int portFrom; + + /** Port to. */ + private final int portTo; + + /** + * Parse string into host and port pair. + * + * @param addrStr String. + * @param dfltPortFrom Default port from. + * @param dfltPortTo Default port to. + * @param errMsgPrefix Error message prefix. + * @return Result. + * @throws IgniteException If failed. + */ + public static HostAndPortRange parse(String addrStr, int dfltPortFrom, int dfltPortTo, String errMsgPrefix) Review comment: I know this is code re-use from 2.0, but still this one needs heavy testing. ########## File path: modules/client-common/src/main/java/org/apache/ignite/client/ClientMessageDecoder.java ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.client; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; +import io.netty.util.CharsetUtil; +import org.apache.ignite.lang.IgniteException; +import org.msgpack.core.MessageFormat; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; + +/** + * Decodes full client messages: + * 1. MAGIC for first message. + * 2. Payload length (varint). + * 3. Payload (bytes). + */ +public class ClientMessageDecoder extends ByteToMessageDecoder { + /** Magic bytes before handshake. */ + public static final byte[] MAGIC_BYTES = new byte[]{0x49, 0x47, 0x4E, 0x49}; // IGNI + + /** Data buffer. */ + private byte[] data = new byte[4]; // TODO: Pooled buffers IGNITE-15162. + + /** Remaining byte count. */ + private int cnt = -4; + + /** Message size. */ + private int msgSize = -1; + + /** Magic decoded flag. */ + private boolean magicDecoded; + + /** Magic decoding failed flag. */ + private boolean magicFailed; + + /** Message size varint format. */ + private MessageFormat sizeFormat = null; + + /** First byte of the message. */ + private byte firstByte; + + /** {@inheritDoc} */ + @Override protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) { + if (!readMagic(byteBuf)) + return; + + while (read(byteBuf)) + list.add(ByteBuffer.wrap(data)); + } + + /** + * Checks the magic header for the first message. + * + * @param byteBuf Buffer. + * @return True when magic header has been received and is valid, false otherwise. + * @throws IgniteException When magic is invalid. + */ + private boolean readMagic(ByteBuf byteBuf) { + if (magicFailed) + return false; + + if (magicDecoded) + return true; + + if (byteBuf.readableBytes() < MAGIC_BYTES.length) + return false; + + byteBuf.readBytes(data, 0, MAGIC_BYTES.length); Review comment: Maybe we should assert prior that `data.length` == `MAGIC_BYTES.length` ########## File path: modules/client-common/src/main/java/org/apache/ignite/client/ClientMessageType.java ########## @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.client; + +/** + * Server to client message types. + */ +public class ClientMessageType { Review comment: Maybe `ServerMessageType` then? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
