ptupitsyn commented on a change in pull request #342:
URL: https://github.com/apache/ignite-3/pull/342#discussion_r710397753



##########
File path: modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
##########
@@ -67,106 +67,377 @@ public Table(string name, Guid id, ClientFailoverSocket 
socket)
         public Guid Id { get; }
 
         /// <inheritdoc/>
-        public async Task<IIgniteTuple?> GetAsync(IIgniteTuple keyRec)
+        public async Task<IIgniteTuple?> GetAsync(IIgniteTuple key)
         {
-            IgniteArgumentCheck.NotNull(keyRec, nameof(keyRec));
+            IgniteArgumentCheck.NotNull(key, nameof(key));
 
             var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
 
             using var writer = new PooledArrayBufferWriter();
-            Write(writer.GetMessageWriter());
+            WriteTuple(writer, schema, key, keyOnly: true);
 
             using var resBuf = await _socket.DoOutInOpAsync(ClientOp.TupleGet, 
writer).ConfigureAwait(false);
-            return Read(resBuf.GetReader());
+            var resSchema = await ReadSchemaAsync(resBuf, 
schema).ConfigureAwait(false);
 
-            void Write(MessagePackWriter w)
+            return ReadValueTuple(resBuf, resSchema, key);
+        }
+
+        /// <inheritdoc/>
+        public async Task<IList<IIgniteTuple>> 
GetAllAsync(IEnumerable<IIgniteTuple> keys)
+        {
+            IgniteArgumentCheck.NotNull(keys, nameof(keys));
+
+            var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
+
+            using var writer = new PooledArrayBufferWriter();
+            var count = WriteTuples(writer, schema, keys, keyOnly: true);
+
+            if (count == 0)
             {
-                w.Write(Id);
-                w.Write(schema.Version);
+                return Array.Empty<IIgniteTuple>();
+            }
 
-                for (var i = 0; i < schema.KeyColumnCount; i++)
-                {
-                    var col = schema.Columns[i];
+            using var resBuf = await 
_socket.DoOutInOpAsync(ClientOp.TupleGetAll, writer).ConfigureAwait(false);
+            var resSchema = await ReadSchemaAsync(resBuf, 
schema).ConfigureAwait(false);
 
-                    w.WriteObject(keyRec[col.Name]);
-                }
+            return ReadTuples(resBuf, resSchema);
+        }
 
-                w.Flush();
+        /// <inheritdoc/>
+        public async Task UpsertAsync(IIgniteTuple record)
+        {
+            IgniteArgumentCheck.NotNull(record, nameof(record));
+
+            var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
+
+            using var writer = new PooledArrayBufferWriter();
+            WriteTuple(writer, schema, record);
+
+            using var resBuf = await 
_socket.DoOutInOpAsync(ClientOp.TupleUpsert, writer).ConfigureAwait(false);
+        }
+
+        /// <inheritdoc/>
+        public async Task UpsertAllAsync(IEnumerable<IIgniteTuple> records)
+        {
+            IgniteArgumentCheck.NotNull(records, nameof(records));
+
+            var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
+
+            using var writer = new PooledArrayBufferWriter();
+            var count = WriteTuples(writer, schema, records);
+
+            if (count == 0)
+            {
+                return;
             }
 
-            IIgniteTuple? Read(MessagePackReader r)
+            using var resBuf = await 
_socket.DoOutInOpAsync(ClientOp.TupleUpsertAll, writer).ConfigureAwait(false);
+        }
+
+        /// <inheritdoc/>
+        public async Task<IIgniteTuple?> GetAndUpsertAsync(IIgniteTuple record)
+        {
+            IgniteArgumentCheck.NotNull(record, nameof(record));
+
+            var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
+
+            using var writer = new PooledArrayBufferWriter();
+            WriteTuple(writer, schema, record);
+
+            using var resBuf = await 
_socket.DoOutInOpAsync(ClientOp.TupleGetAndUpsert, 
writer).ConfigureAwait(false);
+            var resSchema = await ReadSchemaAsync(resBuf, 
schema).ConfigureAwait(false);
+
+            return ReadValueTuple(resBuf, resSchema, record);
+        }
+
+        /// <inheritdoc/>
+        public async Task<bool> InsertAsync(IIgniteTuple record)
+        {
+            IgniteArgumentCheck.NotNull(record, nameof(record));
+
+            var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
+
+            using var writer = new PooledArrayBufferWriter();
+            WriteTuple(writer, schema, record);
+
+            using var resBuf = await 
_socket.DoOutInOpAsync(ClientOp.TupleInsert, writer).ConfigureAwait(false);
+            return resBuf.GetReader().ReadBoolean();
+        }
+
+        /// <inheritdoc/>
+        public async Task<IList<IIgniteTuple>> 
InsertAllAsync(IEnumerable<IIgniteTuple> records)
+        {
+            IgniteArgumentCheck.NotNull(records, nameof(records));
+
+            var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
+
+            using var writer = new PooledArrayBufferWriter();
+            var count = WriteTuples(writer, schema, records);
+
+            if (count == 0)
             {
-                if (r.NextMessagePackType == MessagePackType.Nil)
-                {
-                    return null;
-                }
+                return Array.Empty<IIgniteTuple>();
+            }
 
-                var schemaVersion = r.ReadInt32();
+            using var resBuf = await 
_socket.DoOutInOpAsync(ClientOp.TupleInsertAll, writer).ConfigureAwait(false);
+            var resSchema = await ReadSchemaAsync(resBuf, 
schema).ConfigureAwait(false);
 
-                if (schemaVersion != schema.Version)
-                {
-                    // TODO: Load schema (IGNITE-15430).
-                    throw new NotSupportedException();
-                }
+            return ReadTuples(resBuf, resSchema);
+        }
 
-                var columns = schema.Columns;
+        /// <inheritdoc/>
+        public async Task<bool> ReplaceAsync(IIgniteTuple record)
+        {
+            IgniteArgumentCheck.NotNull(record, nameof(record));
 
-                var tuple = new IgniteTuple(columns.Count);
+            var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
 
-                for (var i = 0; i < columns.Count; i++)
-                {
-                    var column = columns[i];
-
-                    if (i < schema.KeyColumnCount)
-                    {
-                        tuple[column.Name] = keyRec[column.Name];
-                    }
-                    else
-                    {
-                        tuple[column.Name] = r.ReadObject(column.Type);
-                    }
-                }
+            using var writer = new PooledArrayBufferWriter();
+            WriteTuple(writer, schema, record);
+
+            using var resBuf = await 
_socket.DoOutInOpAsync(ClientOp.TupleReplace, writer).ConfigureAwait(false);
+            return resBuf.GetReader().ReadBoolean();
+        }
+
+        /// <inheritdoc/>
+        public async Task<bool> ReplaceAsync(IIgniteTuple record, IIgniteTuple 
newRecord)
+        {
+            IgniteArgumentCheck.NotNull(record, nameof(record));
+
+            var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
+
+            using var writer = new PooledArrayBufferWriter();
+            WriteTuples(writer, schema, record, newRecord);
+
+            using var resBuf = await 
_socket.DoOutInOpAsync(ClientOp.TupleReplaceExact, 
writer).ConfigureAwait(false);
+            return resBuf.GetReader().ReadBoolean();
+        }
+
+        /// <inheritdoc/>
+        public async Task<IIgniteTuple?> GetAndReplaceAsync(IIgniteTuple 
record)
+        {
+            IgniteArgumentCheck.NotNull(record, nameof(record));
+
+            var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
+
+            using var writer = new PooledArrayBufferWriter();
+            WriteTuple(writer, schema, record);
+
+            using var resBuf = await 
_socket.DoOutInOpAsync(ClientOp.TupleGetAndReplace, 
writer).ConfigureAwait(false);
+            var resSchema = await ReadSchemaAsync(resBuf, 
schema).ConfigureAwait(false);
+
+            return ReadValueTuple(resBuf, resSchema, record);
+        }
+
+        /// <inheritdoc/>
+        public async Task<bool> DeleteAsync(IIgniteTuple key)
+        {
+            IgniteArgumentCheck.NotNull(key, nameof(key));
+
+            var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
 
-                return tuple;
+            using var writer = new PooledArrayBufferWriter();
+            WriteTuple(writer, schema, key, keyOnly: true);
+
+            using var resBuf = await 
_socket.DoOutInOpAsync(ClientOp.TupleDelete, writer).ConfigureAwait(false);
+            return resBuf.GetReader().ReadBoolean();
+        }
+
+        /// <inheritdoc/>
+        public async Task<bool> DeleteExactAsync(IIgniteTuple record)
+        {
+            IgniteArgumentCheck.NotNull(record, nameof(record));
+
+            var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
+
+            using var writer = new PooledArrayBufferWriter();
+            WriteTuple(writer, schema, record);
+
+            using var resBuf = await 
_socket.DoOutInOpAsync(ClientOp.TupleDeleteExact, writer).ConfigureAwait(false);
+            return resBuf.GetReader().ReadBoolean();
+        }
+
+        /// <inheritdoc/>
+        public async Task<IIgniteTuple?> GetAndDeleteAsync(IIgniteTuple key)
+        {
+            IgniteArgumentCheck.NotNull(key, nameof(key));
+
+            var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
+
+            using var writer = new PooledArrayBufferWriter();
+            WriteTuple(writer, schema, key, keyOnly: true);
+
+            using var resBuf = await 
_socket.DoOutInOpAsync(ClientOp.TupleGetAndDelete, 
writer).ConfigureAwait(false);
+            var resSchema = await ReadSchemaAsync(resBuf, 
schema).ConfigureAwait(false);
+
+            return ReadValueTuple(resBuf, resSchema, key);
+        }
+
+        /// <inheritdoc/>
+        public async Task<IList<IIgniteTuple>> 
DeleteAllAsync(IEnumerable<IIgniteTuple> keys)
+        {
+            IgniteArgumentCheck.NotNull(keys, nameof(keys));
+
+            var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
+
+            using var writer = new PooledArrayBufferWriter();
+            var count = WriteTuples(writer, schema, keys, keyOnly: true);
+
+            if (count == 0)
+            {
+                return Array.Empty<IIgniteTuple>();
             }
+
+            using var resBuf = await 
_socket.DoOutInOpAsync(ClientOp.TupleDeleteAll, writer).ConfigureAwait(false);
+            var resSchema = await ReadSchemaAsync(resBuf, 
schema).ConfigureAwait(false);
+
+            return ReadTuples(resBuf, resSchema, keyOnly: true);
         }
 
         /// <inheritdoc/>
-        public async Task UpsertAsync(IIgniteTuple rec)
+        public async Task<IList<IIgniteTuple>> 
DeleteAllExactAsync(IEnumerable<IIgniteTuple> records)
         {
-            IgniteArgumentCheck.NotNull(rec, nameof(rec));
+            IgniteArgumentCheck.NotNull(records, nameof(records));
 
             var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
 
             using var writer = new PooledArrayBufferWriter();
-            Write(writer.GetMessageWriter());
+            var count = WriteTuples(writer, schema, records);
 
-            using var resBuf = await 
_socket.DoOutInOpAsync(ClientOp.TupleUpsert, writer).ConfigureAwait(false);
+            if (count == 0)
+            {
+                return Array.Empty<IIgniteTuple>();
+            }
+
+            using var resBuf = await 
_socket.DoOutInOpAsync(ClientOp.TupleDeleteAllExact, 
writer).ConfigureAwait(false);
+            var resSchema = await ReadSchemaAsync(resBuf, 
schema).ConfigureAwait(false);
+
+            return ReadTuples(resBuf, resSchema);
+        }
 
-            void Write(MessagePackWriter w)
+        private static IIgniteTuple? ReadValueTuple(PooledBuffer buf, Schema? 
schema, IIgniteTuple key)
+        {
+            if (schema == null)
             {
-                w.Write(Id);
-                w.Write(schema.Version);
+                return null;
+            }
 
-                foreach (var col in schema.Columns)
+            // Skip schema version.
+            var r = buf.GetReader();
+            r.Skip();
+
+            var columns = schema.Columns;
+            var tuple = new IgniteTuple(columns.Count);
+
+            for (var i = 0; i < columns.Count; i++)
+            {
+                var column = columns[i];
+
+                if (i < schema.KeyColumnCount)
+                {
+                    tuple[column.Name] = key[column.Name];
+                }
+                else
                 {
-                    var colIdx = rec.GetOrdinal(col.Name);
-
-                    if (colIdx < 0)
-                    {
-                        w.WriteNil();
-                    }
-                    else
-                    {
-                        w.WriteObject(rec[colIdx]);
-                    }
+                    tuple[column.Name] = r.ReadObject(column.Type);
                 }
+            }
 
-                w.Flush();
+            return tuple;
+        }
+
+        private static IIgniteTuple ReadTuple(ref MessagePackReader r, Schema 
schema, bool keyOnly = false)
+        {
+            var columns = schema.Columns;
+            var count = keyOnly ? schema.KeyColumnCount : columns.Count;

Review comment:
       Yes, key columns always come first in the schema.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to