isapego commented on a change in pull request #342:
URL: https://github.com/apache/ignite-3/pull/342#discussion_r710296836
##########
File path: modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
##########
@@ -67,106 +67,377 @@ public Table(string name, Guid id, ClientFailoverSocket
socket)
public Guid Id { get; }
/// <inheritdoc/>
- public async Task<IIgniteTuple?> GetAsync(IIgniteTuple keyRec)
+ public async Task<IIgniteTuple?> GetAsync(IIgniteTuple key)
{
- IgniteArgumentCheck.NotNull(keyRec, nameof(keyRec));
+ IgniteArgumentCheck.NotNull(key, nameof(key));
var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
using var writer = new PooledArrayBufferWriter();
- Write(writer.GetMessageWriter());
+ WriteTuple(writer, schema, key, keyOnly: true);
using var resBuf = await _socket.DoOutInOpAsync(ClientOp.TupleGet,
writer).ConfigureAwait(false);
- return Read(resBuf.GetReader());
+ var resSchema = await ReadSchemaAsync(resBuf,
schema).ConfigureAwait(false);
- void Write(MessagePackWriter w)
+ return ReadValueTuple(resBuf, resSchema, key);
+ }
+
+ /// <inheritdoc/>
+ public async Task<IList<IIgniteTuple>>
GetAllAsync(IEnumerable<IIgniteTuple> keys)
+ {
+ IgniteArgumentCheck.NotNull(keys, nameof(keys));
+
+ var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
+
+ using var writer = new PooledArrayBufferWriter();
+ var count = WriteTuples(writer, schema, keys, keyOnly: true);
+
+ if (count == 0)
{
- w.Write(Id);
- w.Write(schema.Version);
+ return Array.Empty<IIgniteTuple>();
+ }
- for (var i = 0; i < schema.KeyColumnCount; i++)
- {
- var col = schema.Columns[i];
+ using var resBuf = await
_socket.DoOutInOpAsync(ClientOp.TupleGetAll, writer).ConfigureAwait(false);
+ var resSchema = await ReadSchemaAsync(resBuf,
schema).ConfigureAwait(false);
- w.WriteObject(keyRec[col.Name]);
- }
+ return ReadTuples(resBuf, resSchema);
+ }
- w.Flush();
+ /// <inheritdoc/>
+ public async Task UpsertAsync(IIgniteTuple record)
+ {
+ IgniteArgumentCheck.NotNull(record, nameof(record));
+
+ var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
+
+ using var writer = new PooledArrayBufferWriter();
+ WriteTuple(writer, schema, record);
+
+ using var resBuf = await
_socket.DoOutInOpAsync(ClientOp.TupleUpsert, writer).ConfigureAwait(false);
+ }
+
+ /// <inheritdoc/>
+ public async Task UpsertAllAsync(IEnumerable<IIgniteTuple> records)
+ {
+ IgniteArgumentCheck.NotNull(records, nameof(records));
+
+ var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
+
+ using var writer = new PooledArrayBufferWriter();
+ var count = WriteTuples(writer, schema, records);
+
+ if (count == 0)
+ {
+ return;
}
- IIgniteTuple? Read(MessagePackReader r)
+ using var resBuf = await
_socket.DoOutInOpAsync(ClientOp.TupleUpsertAll, writer).ConfigureAwait(false);
+ }
+
+ /// <inheritdoc/>
+ public async Task<IIgniteTuple?> GetAndUpsertAsync(IIgniteTuple record)
+ {
+ IgniteArgumentCheck.NotNull(record, nameof(record));
+
+ var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
+
+ using var writer = new PooledArrayBufferWriter();
+ WriteTuple(writer, schema, record);
+
+ using var resBuf = await
_socket.DoOutInOpAsync(ClientOp.TupleGetAndUpsert,
writer).ConfigureAwait(false);
+ var resSchema = await ReadSchemaAsync(resBuf,
schema).ConfigureAwait(false);
+
+ return ReadValueTuple(resBuf, resSchema, record);
+ }
+
+ /// <inheritdoc/>
+ public async Task<bool> InsertAsync(IIgniteTuple record)
+ {
+ IgniteArgumentCheck.NotNull(record, nameof(record));
+
+ var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
+
+ using var writer = new PooledArrayBufferWriter();
+ WriteTuple(writer, schema, record);
+
+ using var resBuf = await
_socket.DoOutInOpAsync(ClientOp.TupleInsert, writer).ConfigureAwait(false);
+ return resBuf.GetReader().ReadBoolean();
+ }
+
+ /// <inheritdoc/>
+ public async Task<IList<IIgniteTuple>>
InsertAllAsync(IEnumerable<IIgniteTuple> records)
+ {
+ IgniteArgumentCheck.NotNull(records, nameof(records));
+
+ var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
+
+ using var writer = new PooledArrayBufferWriter();
+ var count = WriteTuples(writer, schema, records);
+
+ if (count == 0)
{
- if (r.NextMessagePackType == MessagePackType.Nil)
- {
- return null;
- }
+ return Array.Empty<IIgniteTuple>();
+ }
- var schemaVersion = r.ReadInt32();
+ using var resBuf = await
_socket.DoOutInOpAsync(ClientOp.TupleInsertAll, writer).ConfigureAwait(false);
+ var resSchema = await ReadSchemaAsync(resBuf,
schema).ConfigureAwait(false);
- if (schemaVersion != schema.Version)
- {
- // TODO: Load schema (IGNITE-15430).
- throw new NotSupportedException();
- }
+ return ReadTuples(resBuf, resSchema);
+ }
- var columns = schema.Columns;
+ /// <inheritdoc/>
+ public async Task<bool> ReplaceAsync(IIgniteTuple record)
+ {
+ IgniteArgumentCheck.NotNull(record, nameof(record));
- var tuple = new IgniteTuple(columns.Count);
+ var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
- for (var i = 0; i < columns.Count; i++)
- {
- var column = columns[i];
-
- if (i < schema.KeyColumnCount)
- {
- tuple[column.Name] = keyRec[column.Name];
- }
- else
- {
- tuple[column.Name] = r.ReadObject(column.Type);
- }
- }
+ using var writer = new PooledArrayBufferWriter();
+ WriteTuple(writer, schema, record);
+
+ using var resBuf = await
_socket.DoOutInOpAsync(ClientOp.TupleReplace, writer).ConfigureAwait(false);
+ return resBuf.GetReader().ReadBoolean();
+ }
+
+ /// <inheritdoc/>
+ public async Task<bool> ReplaceAsync(IIgniteTuple record, IIgniteTuple
newRecord)
+ {
+ IgniteArgumentCheck.NotNull(record, nameof(record));
+
+ var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
+
+ using var writer = new PooledArrayBufferWriter();
+ WriteTuples(writer, schema, record, newRecord);
+
+ using var resBuf = await
_socket.DoOutInOpAsync(ClientOp.TupleReplaceExact,
writer).ConfigureAwait(false);
+ return resBuf.GetReader().ReadBoolean();
+ }
+
+ /// <inheritdoc/>
+ public async Task<IIgniteTuple?> GetAndReplaceAsync(IIgniteTuple
record)
+ {
+ IgniteArgumentCheck.NotNull(record, nameof(record));
+
+ var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
+
+ using var writer = new PooledArrayBufferWriter();
+ WriteTuple(writer, schema, record);
+
+ using var resBuf = await
_socket.DoOutInOpAsync(ClientOp.TupleGetAndReplace,
writer).ConfigureAwait(false);
+ var resSchema = await ReadSchemaAsync(resBuf,
schema).ConfigureAwait(false);
+
+ return ReadValueTuple(resBuf, resSchema, record);
+ }
+
+ /// <inheritdoc/>
+ public async Task<bool> DeleteAsync(IIgniteTuple key)
+ {
+ IgniteArgumentCheck.NotNull(key, nameof(key));
+
+ var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
- return tuple;
+ using var writer = new PooledArrayBufferWriter();
+ WriteTuple(writer, schema, key, keyOnly: true);
+
+ using var resBuf = await
_socket.DoOutInOpAsync(ClientOp.TupleDelete, writer).ConfigureAwait(false);
+ return resBuf.GetReader().ReadBoolean();
+ }
+
+ /// <inheritdoc/>
+ public async Task<bool> DeleteExactAsync(IIgniteTuple record)
+ {
+ IgniteArgumentCheck.NotNull(record, nameof(record));
+
+ var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
+
+ using var writer = new PooledArrayBufferWriter();
+ WriteTuple(writer, schema, record);
+
+ using var resBuf = await
_socket.DoOutInOpAsync(ClientOp.TupleDeleteExact, writer).ConfigureAwait(false);
+ return resBuf.GetReader().ReadBoolean();
+ }
+
+ /// <inheritdoc/>
+ public async Task<IIgniteTuple?> GetAndDeleteAsync(IIgniteTuple key)
+ {
+ IgniteArgumentCheck.NotNull(key, nameof(key));
+
+ var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
+
+ using var writer = new PooledArrayBufferWriter();
+ WriteTuple(writer, schema, key, keyOnly: true);
+
+ using var resBuf = await
_socket.DoOutInOpAsync(ClientOp.TupleGetAndDelete,
writer).ConfigureAwait(false);
+ var resSchema = await ReadSchemaAsync(resBuf,
schema).ConfigureAwait(false);
+
+ return ReadValueTuple(resBuf, resSchema, key);
+ }
+
+ /// <inheritdoc/>
+ public async Task<IList<IIgniteTuple>>
DeleteAllAsync(IEnumerable<IIgniteTuple> keys)
+ {
+ IgniteArgumentCheck.NotNull(keys, nameof(keys));
+
+ var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
+
+ using var writer = new PooledArrayBufferWriter();
+ var count = WriteTuples(writer, schema, keys, keyOnly: true);
+
+ if (count == 0)
+ {
+ return Array.Empty<IIgniteTuple>();
}
+
+ using var resBuf = await
_socket.DoOutInOpAsync(ClientOp.TupleDeleteAll, writer).ConfigureAwait(false);
+ var resSchema = await ReadSchemaAsync(resBuf,
schema).ConfigureAwait(false);
+
+ return ReadTuples(resBuf, resSchema, keyOnly: true);
}
/// <inheritdoc/>
- public async Task UpsertAsync(IIgniteTuple rec)
+ public async Task<IList<IIgniteTuple>>
DeleteAllExactAsync(IEnumerable<IIgniteTuple> records)
{
- IgniteArgumentCheck.NotNull(rec, nameof(rec));
+ IgniteArgumentCheck.NotNull(records, nameof(records));
var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
using var writer = new PooledArrayBufferWriter();
- Write(writer.GetMessageWriter());
+ var count = WriteTuples(writer, schema, records);
- using var resBuf = await
_socket.DoOutInOpAsync(ClientOp.TupleUpsert, writer).ConfigureAwait(false);
+ if (count == 0)
+ {
+ return Array.Empty<IIgniteTuple>();
+ }
+
+ using var resBuf = await
_socket.DoOutInOpAsync(ClientOp.TupleDeleteAllExact,
writer).ConfigureAwait(false);
+ var resSchema = await ReadSchemaAsync(resBuf,
schema).ConfigureAwait(false);
+
+ return ReadTuples(resBuf, resSchema);
+ }
- void Write(MessagePackWriter w)
+ private static IIgniteTuple? ReadValueTuple(PooledBuffer buf, Schema?
schema, IIgniteTuple key)
+ {
+ if (schema == null)
{
- w.Write(Id);
- w.Write(schema.Version);
+ return null;
+ }
- foreach (var col in schema.Columns)
+ // Skip schema version.
+ var r = buf.GetReader();
+ r.Skip();
+
+ var columns = schema.Columns;
+ var tuple = new IgniteTuple(columns.Count);
+
+ for (var i = 0; i < columns.Count; i++)
+ {
+ var column = columns[i];
+
+ if (i < schema.KeyColumnCount)
+ {
+ tuple[column.Name] = key[column.Name];
+ }
+ else
{
- var colIdx = rec.GetOrdinal(col.Name);
-
- if (colIdx < 0)
- {
- w.WriteNil();
- }
- else
- {
- w.WriteObject(rec[colIdx]);
- }
+ tuple[column.Name] = r.ReadObject(column.Type);
}
+ }
- w.Flush();
+ return tuple;
+ }
+
+ private static IIgniteTuple ReadTuple(ref MessagePackReader r, Schema
schema, bool keyOnly = false)
+ {
+ var columns = schema.Columns;
+ var count = keyOnly ? schema.KeyColumnCount : columns.Count;
Review comment:
Do key columns always go first?
##########
File path: modules/platforms/dotnet/Apache.Ignite/Table/IgniteTuple.cs
##########
@@ -76,5 +77,27 @@ public IgniteTuple(int capacity = 16)
/// <inheritdoc/>
public int GetOrdinal(string name) => _indexes.TryGetValue(name, out
var index) ? index : -1;
+
+ /// <inheritdoc />
+ public override string ToString()
+ {
+ var sb = new StringBuilder();
+
+ sb.Append(nameof(IgniteTuple)).Append(" [");
Review comment:
Maybe we should not have a whitespace here.
##########
File path: modules/platforms/dotnet/Apache.Ignite/Table/ITableView.cs
##########
@@ -31,15 +32,148 @@ public interface ITableView<T>
/// <summary>
/// Gets a record by key.
/// </summary>
- /// <param name="keyRec">A record with key columns set.</param>
- /// <returns>A <see cref="Task"/> representing the asynchronous
operation.</returns>
- Task<T?> GetAsync(T keyRec);
+ /// <param name="key">A record with key columns set.</param>
+ /// <returns>
+ /// A <see cref="Task"/> representing the asynchronous operation.
+ /// The task result contains a record with all columns.
+ /// </returns>
+ Task<T?> GetAsync(T key);
+
+ /// <summary>
+ /// Gets multiple records by keys.
+ /// </summary>
+ /// <param name="keys">Collection of records with key columns
set.</param>
+ /// <returns>
+ /// A <see cref="Task"/> representing the asynchronous operation.
+ /// The task result contains matching records with all columns.
+ /// </returns>
+ Task<IList<T>> GetAllAsync(IEnumerable<T> keys);
/// <summary>
/// Inserts a record into the table if it does not exist or replaces
the existing one.
/// </summary>
- /// <param name="rec">Record to upsert.</param>
+ /// <param name="record">Record to upsert.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous
operation.</returns>
- Task UpsertAsync(T rec);
+ Task UpsertAsync(T record);
+
+ /// <summary>
+ /// Inserts multiple records into the table, replacing existing ones.
+ /// </summary>
+ /// <param name="records">Records to upsert.</param>
+ /// <returns>A <see cref="Task"/> representing the asynchronous
operation.</returns>
+ Task UpsertAllAsync(IEnumerable<T> records);
+
+ /// <summary>
+ /// Inserts a record into the table if it does not exist or replaces
the existing one.
+ /// </summary>
+ /// <param name="record">Record to upsert.</param>
+ /// <returns>
+ /// A <see cref="Task"/> representing the asynchronous operation.
+ /// The task result contains replaced record or null if it did not
exist.
+ /// </returns>
+ Task<T?> GetAndUpsertAsync(T record);
+
+ /// <summary>
+ /// Inserts a record into the table if it does not exist.
+ /// </summary>
+ /// <param name="record">Record to insert.</param>
+ /// <returns>
+ /// A <see cref="Task"/> representing the asynchronous operation.
+ /// The task result contains a value indicating whether the record was
inserted. Returns <c>false</c> if a
+ /// record with the same key already exists.
+ /// </returns>
+ Task<bool> InsertAsync(T record);
+
+ /// <summary>
+ /// Inserts multiple records into the table, skipping existing ones.
+ /// </summary>
+ /// <param name="records">Records to insert.</param>
+ /// <returns>
+ /// A <see cref="Task"/> representing the asynchronous operation.
+ /// The task result contains skipped records.
+ /// </returns>
+ Task<IList<T>> InsertAllAsync(IEnumerable<T> records);
+
+ /// <summary>
+ /// Replaces a record with the same key columns if it exists.
Review comment:
```suggestion
/// Replaces a record with the same key columns only if it exists.
```
Probably we should somehow clarify that nothing happens if the record does
not exist.
##########
File path: modules/platforms/dotnet/Apache.Ignite/Table/ITableView.cs
##########
@@ -31,15 +32,148 @@ public interface ITableView<T>
/// <summary>
/// Gets a record by key.
/// </summary>
- /// <param name="keyRec">A record with key columns set.</param>
- /// <returns>A <see cref="Task"/> representing the asynchronous
operation.</returns>
- Task<T?> GetAsync(T keyRec);
+ /// <param name="key">A record with key columns set.</param>
+ /// <returns>
+ /// A <see cref="Task"/> representing the asynchronous operation.
+ /// The task result contains a record with all columns.
+ /// </returns>
+ Task<T?> GetAsync(T key);
+
+ /// <summary>
+ /// Gets multiple records by keys.
+ /// </summary>
+ /// <param name="keys">Collection of records with key columns
set.</param>
+ /// <returns>
+ /// A <see cref="Task"/> representing the asynchronous operation.
+ /// The task result contains matching records with all columns.
+ /// </returns>
+ Task<IList<T>> GetAllAsync(IEnumerable<T> keys);
/// <summary>
/// Inserts a record into the table if it does not exist or replaces
the existing one.
/// </summary>
- /// <param name="rec">Record to upsert.</param>
+ /// <param name="record">Record to upsert.</param>
/// <returns>A <see cref="Task"/> representing the asynchronous
operation.</returns>
- Task UpsertAsync(T rec);
+ Task UpsertAsync(T record);
+
+ /// <summary>
+ /// Inserts multiple records into the table, replacing existing ones.
+ /// </summary>
+ /// <param name="records">Records to upsert.</param>
+ /// <returns>A <see cref="Task"/> representing the asynchronous
operation.</returns>
+ Task UpsertAllAsync(IEnumerable<T> records);
+
+ /// <summary>
+ /// Inserts a record into the table if it does not exist or replaces
the existing one.
+ /// </summary>
+ /// <param name="record">Record to upsert.</param>
+ /// <returns>
+ /// A <see cref="Task"/> representing the asynchronous operation.
+ /// The task result contains replaced record or null if it did not
exist.
+ /// </returns>
+ Task<T?> GetAndUpsertAsync(T record);
+
+ /// <summary>
+ /// Inserts a record into the table if it does not exist.
+ /// </summary>
+ /// <param name="record">Record to insert.</param>
+ /// <returns>
+ /// A <see cref="Task"/> representing the asynchronous operation.
+ /// The task result contains a value indicating whether the record was
inserted. Returns <c>false</c> if a
+ /// record with the same key already exists.
+ /// </returns>
+ Task<bool> InsertAsync(T record);
+
+ /// <summary>
+ /// Inserts multiple records into the table, skipping existing ones.
+ /// </summary>
+ /// <param name="records">Records to insert.</param>
+ /// <returns>
+ /// A <see cref="Task"/> representing the asynchronous operation.
+ /// The task result contains skipped records.
+ /// </returns>
+ Task<IList<T>> InsertAllAsync(IEnumerable<T> records);
+
+ /// <summary>
+ /// Replaces a record with the same key columns if it exists.
+ /// </summary>
+ /// <param name="record">Record to insert.</param>
+ /// <returns>
+ /// A <see cref="Task"/> representing the asynchronous operation.
+ /// The task result contains a value indicating whether a record with
the specified key was replaced.
+ /// </returns>
+ Task<bool> ReplaceAsync(T record);
+
+ /// <summary>
+ /// Replaces a record with a new one only if all existing columns have
the same values
+ /// as the specified <paramref name="record"/>.
+ /// </summary>
+ /// <param name="record">Record to replace.</param>
+ /// <param name="newRecord">Record to replace with.</param>
+ /// <returns>
+ /// A <see cref="Task"/> representing the asynchronous operation.
+ /// The task result contains a value indicating whether a record was
replaced.
+ /// </returns>
+ Task<bool> ReplaceAsync(T record, T newRecord);
+
+ /// <summary>
+ /// Replaces a record with the same key columns if it exists.
+ /// </summary>
+ /// <param name="record">Record to insert.</param>
+ /// <returns>
+ /// A <see cref="Task"/> representing the asynchronous operation.
+ /// The task result contains the previous value for the given key, or
<c>null</c> if it did not exist.
+ /// </returns>
+ Task<T?> GetAndReplaceAsync(T record);
+
+ /// <summary>
+ /// Deletes a record with the specified key.
+ /// </summary>
+ /// <param name="key">A record with key columns set.</param>
+ /// <returns>
+ /// A <see cref="Task"/> representing the asynchronous operation.
+ /// The task result contains a value indicating whether a record with
the specified key was deleted.
+ /// </returns>
+ Task<bool> DeleteAsync(T key);
+
+ /// <summary>
+ /// Deletes a record only if all existing columns have the same values
as the specified <paramref name="record"/>.
+ /// </summary>
+ /// <param name="record">A record with all columns set.</param>
+ /// <returns>
+ /// A <see cref="Task"/> representing the asynchronous operation.
+ /// The task result contains a value indicating whether a record was
deleted.
+ /// </returns>
+ Task<bool> DeleteExactAsync(T record);
+
+ /// <summary>
+ /// Gets and deletes a record with the specified key.
+ /// </summary>
+ /// <param name="key">A record with key columns set.</param>
+ /// <returns>
+ /// A <see cref="Task"/> representing the asynchronous operation.
+ /// The task result contains deleted record or <c>null</c> if it did
not exist.
+ /// </returns>
+ Task<T?> GetAndDeleteAsync(T key);
+
+ /// <summary>
+ /// Deletes multiple records.
Review comment:
Again, we should probably clarify, whether the whole operation will fail
if one of the records does not exist. I know it can be understand from the
`<returns>` section, but I believe those details should be clear from the
description itself.
##########
File path: modules/platforms/dotnet/Apache.Ignite/Internal/Table/Table.cs
##########
@@ -67,106 +67,377 @@ public Table(string name, Guid id, ClientFailoverSocket
socket)
public Guid Id { get; }
/// <inheritdoc/>
- public async Task<IIgniteTuple?> GetAsync(IIgniteTuple keyRec)
+ public async Task<IIgniteTuple?> GetAsync(IIgniteTuple key)
{
- IgniteArgumentCheck.NotNull(keyRec, nameof(keyRec));
+ IgniteArgumentCheck.NotNull(key, nameof(key));
var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
using var writer = new PooledArrayBufferWriter();
- Write(writer.GetMessageWriter());
+ WriteTuple(writer, schema, key, keyOnly: true);
using var resBuf = await _socket.DoOutInOpAsync(ClientOp.TupleGet,
writer).ConfigureAwait(false);
- return Read(resBuf.GetReader());
+ var resSchema = await ReadSchemaAsync(resBuf,
schema).ConfigureAwait(false);
- void Write(MessagePackWriter w)
+ return ReadValueTuple(resBuf, resSchema, key);
+ }
+
+ /// <inheritdoc/>
+ public async Task<IList<IIgniteTuple>>
GetAllAsync(IEnumerable<IIgniteTuple> keys)
+ {
+ IgniteArgumentCheck.NotNull(keys, nameof(keys));
+
+ var schema = await GetLatestSchemaAsync().ConfigureAwait(false);
+
+ using var writer = new PooledArrayBufferWriter();
+ var count = WriteTuples(writer, schema, keys, keyOnly: true);
+
+ if (count == 0)
Review comment:
Should not we check this part earlier? Like, we can return empty
collection without requesting schema if the `keys` is empty.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]