ascherbakoff commented on a change in pull request #400:
URL: https://github.com/apache/ignite-3/pull/400#discussion_r749096432
##########
File path:
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
##########
@@ -141,151 +151,183 @@ public InternalTableImpl(
this.schemaMode = schemaMode;
}
- /** {@inheritDoc} */
- @Override public CompletableFuture<BinaryRow> get(BinaryRow keyRow,
Transaction tx) {
- return partitionMap.get(partId(keyRow)).<SingleRowResponse>run(new
GetCommand(keyRow))
- .thenApply(SingleRowResponse::getValue);
- }
+ /**
+ * @param keyRows Rows.
+ * @param tx The transaction.
+ * @param op Command factory.
+ * @param reducer The reducer.
+ * @param <R> Reducer's input.
+ * @param <T> Reducer's output.
+ * @return
+ */
+ private <R, T> CompletableFuture<T> wrapInTx(
+ Collection<BinaryRow> keyRows,
+ InternalTransaction tx,
+ BiFunction<Collection<BinaryRow>, InternalTransaction, Command> op,
+ Function<CompletableFuture<R>[], CompletableFuture<T>> reducer
+ ) {
+ if (tx == null) {
+ try {
+ tx = txManager.tx();
+ }
+ catch (TransactionException e) {
+ return failedFuture(e);
+ }
+ }
- /** {@inheritDoc} */
- @Override public CompletableFuture<Collection<BinaryRow>>
getAll(Collection<BinaryRow> keyRows, Transaction tx) {
- Map<Integer, Set<BinaryRow>> keyRowsByPartition =
mapRowsToPartitions(keyRows);
+ final boolean implicit = tx == null;
+
+ final InternalTransaction tx0 = implicit ? txManager.begin() : tx;
- CompletableFuture<MultiRowsResponse>[] futures = new
CompletableFuture[keyRowsByPartition.size()];
+ Map<Integer, List<BinaryRow>> keyRowsByPartition =
mapRowsToPartitions(keyRows);
+
+ CompletableFuture<R>[] futures = new
CompletableFuture[keyRowsByPartition.size()];
int batchNum = 0;
- for (Map.Entry<Integer, Set<BinaryRow>> partToRows :
keyRowsByPartition.entrySet()) {
- futures[batchNum] = partitionMap.get(partToRows.getKey()).run(new
GetAllCommand(partToRows.getValue()));
+ for (Map.Entry<Integer, List<BinaryRow>> partToRows :
keyRowsByPartition.entrySet()) {
+ CompletableFuture<RaftGroupService> fut =
enlist(partToRows.getKey(), tx0);
- batchNum++;
+ futures[batchNum++] = fut.thenCompose(svc ->
svc.run(op.apply(partToRows.getValue(), tx0)));
}
- return collectMultiRowsResponses(futures);
- }
+ CompletableFuture<T> fut = reducer.apply(futures);
- /** {@inheritDoc} */
- @Override public CompletableFuture<Void> upsert(BinaryRow row, Transaction
tx) {
- return partitionMap.get(partId(row)).run(new UpsertCommand(row));
+ return fut.handle(new BiFunction<T, Throwable, CompletableFuture<T>>()
{
+ @Override public CompletableFuture<T> apply(T r, Throwable e) {
+ if (e != null)
+ return tx0.rollbackAsync().thenCompose(ignored ->
failedFuture(e)); // Preserve failed state.
+ else
+ return implicit ? tx0.commitAsync().thenApply(ignored ->
r) : completedFuture(r);
+ }
+ }).thenCompose(x -> x);
}
- /** {@inheritDoc} */
- @Override public CompletableFuture<Void> upsertAll(Collection<BinaryRow>
rows, Transaction tx) {
- Map<Integer, Set<BinaryRow>> keyRowsByPartition =
mapRowsToPartitions(rows);
+ /**
+ * @param row The row.
+ * @param tx The transaction.
+ * @param op Command factory.
+ * @param trans Transform closure.
+ * @param <R> Transform input.
+ * @param <T> Transform output.
+ * @return
+ */
+ private <R, T> CompletableFuture<T> wrapInTx(
+ BinaryRow row,
+ InternalTransaction tx,
+ Function<InternalTransaction, Command> op,
+ Function<R, T> trans
+ ) {
+ if (tx == null) {
+ try {
+ tx = txManager.tx();
+ }
+ catch (TransactionException e) {
+ return failedFuture(e);
+ }
+ }
- CompletableFuture<Void>[] futures = new
CompletableFuture[keyRowsByPartition.size()];
+ final boolean implicit = tx == null;
- int batchNum = 0;
+ final InternalTransaction tx0 = implicit ? txManager.begin() : tx;
- for (Map.Entry<Integer, Set<BinaryRow>> partToRows :
keyRowsByPartition.entrySet()) {
- futures[batchNum] = partitionMap.get(partToRows.getKey()).run(new
UpsertAllCommand(partToRows.getValue()));
+ int partId = partId(row);
- batchNum++;
- }
+ CompletableFuture<RaftGroupService> enlistFut = enlist(partId, tx0);
- return CompletableFuture.allOf(futures);
+ CompletableFuture<T> fut = enlistFut.thenCompose(svc ->
svc.<R>run(op.apply(tx0)).thenApply(trans::apply));
+
+ // TODO asch remove futures creation
+ return fut.handle(new BiFunction<T, Throwable, CompletableFuture<T>>()
{
+ @Override public CompletableFuture<T> apply(T r, Throwable e) {
+ if (e != null)
+ return tx0.rollbackAsync().thenCompose(ignored ->
failedFuture(e)); // Preserve failed state.
+ else
+ return implicit ? tx0.commitAsync().thenApply(ignored ->
r) : completedFuture(r);
+ }
+ }).thenCompose(x -> x);
Review comment:
This is required to convert `handle ` result to method's return value
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]