ACCUMULO-1613 added documentation for conditional writer

Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/73971825
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/73971825
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/73971825

Branch: refs/heads/master
Commit: 739718253bd49dbe5c74e041b5910cfd12edad29
Parents: 2faafcc
Author: Keith Turner <ktur...@apache.org>
Authored: Tue Nov 19 17:15:23 2013 -0500
Committer: Keith Turner <ktur...@apache.org>
Committed: Tue Nov 19 17:15:23 2013 -0500

----------------------------------------------------------------------
 .../accumulo/core/client/ConditionalWriter.java |   3 +
 .../apache/accumulo/core/data/Condition.java    |  54 ++++
 .../accumulo/core/data/ConditionalMutation.java |   2 +
 .../accumulo_user_manual/chapters/clients.tex   |  27 +-
 .../examples/simple/reservations/ARS.java       | 308 +++++++++++++++++++
 .../resources/docs/examples/README.reservations |  66 ++++
 6 files changed, 459 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/73971825/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java 
b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
index 4ed4d31..2c24a2e 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java
@@ -23,6 +23,9 @@ import 
org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode;
 import org.apache.accumulo.core.data.ConditionalMutation;
 
 /**
+ * ConditionalWriter provides the ability to do efficient, atomic 
read-modify-write operations on rows. These operations are performed on the 
tablet server
+ * while a row lock is held.
+ * 
  * @since 1.6.0
  */
 public interface ConditionalWriter {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/73971825/core/src/main/java/org/apache/accumulo/core/data/Condition.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/data/Condition.java 
b/core/src/main/java/org/apache/accumulo/core/data/Condition.java
index df20682..fc8f2bf 100644
--- a/core/src/main/java/org/apache/accumulo/core/data/Condition.java
+++ b/core/src/main/java/org/apache/accumulo/core/data/Condition.java
@@ -26,6 +26,7 @@ import org.apache.accumulo.core.util.ArgumentChecker;
 import org.apache.hadoop.io.Text;
 
 /**
+ * Conditions that must be met on a particular column in a row.
  * 
  * @since 1.6.0
  */
@@ -76,6 +77,13 @@ public class Condition {
     return cq;
   }
 
+  /**
+   * Sets the version for the column to check. If this is not set then the 
latest column will be checked, unless iterators do something different.
+   * 
+   * @param ts
+   * @return returns this
+   */
+
   public Condition setTimestamp(long ts) {
     this.ts = ts;
     return this;
@@ -85,24 +93,53 @@ public class Condition {
     return ts;
   }
 
+  /**
+   * see {@link #setValue(byte[])}
+   * 
+   * @param value
+   * @return returns this
+   */
+
   public Condition setValue(CharSequence value) {
     ArgumentChecker.notNull(value);
     this.val = new 
ArrayByteSequence(value.toString().getBytes(Constants.UTF8));
     return this;
   }
 
+  /**
+   * This method sets the expected value of a column. Inorder for the 
condition to pass the column must exist and have this value. If a value is not 
set, then
+   * the column must be absent for the condition to pass.
+   * 
+   * @param value
+   * @return returns this
+   */
+
   public Condition setValue(byte[] value) {
     ArgumentChecker.notNull(value);
     this.val = new ArrayByteSequence(value);
     return this;
   }
   
+  /**
+   * see {@link #setValue(byte[])}
+   * 
+   * @param value
+   * @return returns this
+   */
+
   public Condition setValue(Text value) {
     ArgumentChecker.notNull(value);
     this.val = new ArrayByteSequence(value.getBytes(), 0, value.getLength());
     return this;
   }
   
+  /**
+   * see {@link #setValue(byte[])}
+   * 
+   * @param value
+   * @return returns this
+   */
+
   public Condition setValue(ByteSequence value) {
     ArgumentChecker.notNull(value);
     this.val = value;
@@ -113,6 +150,13 @@ public class Condition {
     return val;
   }
 
+  /**
+   * Sets the visibility for the column to check. If not set it defaults to 
empty visibility.
+   * 
+   * @param cv
+   * @return returns this
+   */
+
   public Condition setVisibility(ColumnVisibility cv) {
     ArgumentChecker.notNull(cv);
     this.cv = new ArrayByteSequence(cv.getExpression());
@@ -123,6 +167,16 @@ public class Condition {
     return cv;
   }
 
+  /**
+   * Set iterators to use when reading the columns value. These iterators will 
be applied in addition to the iterators configured for the table. Using 
iterators
+   * its possible to test other conditions, besides equality and absence, like 
less than. On the server side the iterators will be seeked using a range that
+   * covers only the family, qualifier, and visibility (if the timestamp is 
set then it will be used to narrow the range). Value equality will be tested 
using
+   * the first entry returned by the iterator stack.
+   * 
+   * @param iterators
+   * @return returns this
+   */
+
   public Condition setIterators(IteratorSetting... iterators) {
     ArgumentChecker.notNull(iterators);
     

http://git-wip-us.apache.org/repos/asf/accumulo/blob/73971825/core/src/main/java/org/apache/accumulo/core/data/ConditionalMutation.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/data/ConditionalMutation.java 
b/core/src/main/java/org/apache/accumulo/core/data/ConditionalMutation.java
index 510396d..c438f6d 100644
--- a/core/src/main/java/org/apache/accumulo/core/data/ConditionalMutation.java
+++ b/core/src/main/java/org/apache/accumulo/core/data/ConditionalMutation.java
@@ -26,6 +26,8 @@ import org.apache.accumulo.core.util.ArgumentChecker;
 import org.apache.hadoop.io.Text;
 
 /**
+ * A Mutation that contains a list of conditions that must all be met before 
the mutation is applied.
+ * 
  * @since 1.6.0
  */
 public class ConditionalMutation extends Mutation {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/73971825/docs/src/main/latex/accumulo_user_manual/chapters/clients.tex
----------------------------------------------------------------------
diff --git a/docs/src/main/latex/accumulo_user_manual/chapters/clients.tex 
b/docs/src/main/latex/accumulo_user_manual/chapters/clients.tex
index 18fbafc..9b35d37 100644
--- a/docs/src/main/latex/accumulo_user_manual/chapters/clients.tex
+++ b/docs/src/main/latex/accumulo_user_manual/chapters/clients.tex
@@ -116,6 +116,31 @@ writer.close();
 An example of using the batch writer can be found at\\
 accumulo/docs/examples/README.batch
 
+\subsection{ConditionalWriter} 
+The ConditionalWriter enables efficient, atomic read-modify-write operations on
+rows.  The ConditionalWriter writes special Mutations which have a list of per
+column conditions that must all be met before the mutation is applied.  The
+conditions are checked in the tablet server while a row lock is
+held\footnote{Mutations written by the BatchWriter will not obtain a row
+lock.}.  The conditions that can be checked for a column are equality and
+absence.  For example a conditional mutation can require that column A is
+absent inorder to be applied.  Iterators can be applied when checking
+conditions.  Using iterators, many other operations besides equality and
+absence can be checked.  For example, using an iterator that converts values
+less than 5 to 0 and everything else to 1, its possible to only apply a
+mutation when a column is less than 5.
+
+In the case when a tablet server dies after a client sent a conditional
+mutation, its not known if the mutation was applied or not.  When this happens
+the ConditionalWriter reports a status of UNKNOWN for the ConditionalMutation.
+In many cases this situation can be dealt with by simply reading the row again
+and possibly sending another conditional mutation.  If this is not sufficient,
+then a higher level of abstraction can be built by storing transactional
+information within a row.
+ 
+An example of using the batch writer can be found at\\
+accumulo/docs/examples/README.reservations
+
 \section{Reading Data}
 
 Accumulo is optimized to quickly retrieve the value associated with a given 
key, and
@@ -329,4 +354,4 @@ for(KeyValue keyValue : results.getResultsIterator()) {
 
 client.closeScanner(scanner);
 \end{verbatim}
-\normalsize
\ No newline at end of file
+\normalsize

http://git-wip-us.apache.org/repos/asf/accumulo/blob/73971825/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java
----------------------------------------------------------------------
diff --git 
a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java
 
b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java
new file mode 100644
index 0000000..0c51843
--- /dev/null
+++ 
b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.examples.simple.reservations;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import jline.console.ConsoleReader;
+
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriter.Status;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Condition;
+import org.apache.accumulo.core.data.ConditionalMutation;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+
+/**
+ * Accumulo Reservation System : An example reservation system using Accumulo. 
Supports atomic reservations of a resource at a date. Wait list are also
+ * supported. Inorder to keep the example simple no checking is done of the 
date. Also the code is inefficient, if interested in improving it take a look 
at the
+ * EXCERCISE comments.
+ */
+
+// EXCERCISE create a test that verifies correctness under concurrency. For 
example, have M threads making reservations against N resources. Each thread 
could
+// randomly reserver and cancel resources for a single user. When each thread 
finishes it know what the state of its single user should be. When all threads
+// finish collect their expected state and verify the status of all users and 
resources. For extra credit run the test on a IAAS provider using 10 nodes and
+// 10 threads per node.
+
+public class ARS {
+
+  private Connector conn;
+  private String rTable;
+
+  public enum ReservationResult {
+    RESERVED, WAIT_LISTED
+  }
+
+  public ARS(Connector conn, String rTable) {
+    this.conn = conn;
+    this.rTable = rTable;
+  }
+
+  public List<String> setCapacity(String what, String when, int count) {
+    // EXCERCISE implement this method which atomically sets a capacity and 
returns anyone who was moved to the waitlist if the capacity was decreased
+
+    throw new UnsupportedOperationException();
+  }
+
+  public ReservationResult reserve(String what, String when, String who) 
throws Exception {
+
+    String row = what + ":" + when;
+
+    // EXCERCISE This code assumes there is no reservation and tries to create 
one. If a reservation exist then the update will fail. This is a good strategy
+    // when its expected there are usually no reservations. Could modify the 
code to scan first.
+
+    // The following mutation requires that the column tx:seq does not exist 
and will fail if it does.
+    ConditionalMutation update = new ConditionalMutation(row, new 
Condition("tx", "seq"));
+    update.put("tx", "seq", "0");
+    update.put("res", String.format("%04d", 0), who);
+
+    ReservationResult result = ReservationResult.RESERVED;
+
+    ConditionalWriter cwriter = conn.createConditionalWriter(rTable, new 
ConditionalWriterConfig());
+
+    try {
+      while (true) {
+        Status status = cwriter.write(update).getStatus();
+        switch (status) {
+          case ACCEPTED:
+            return result;
+          case REJECTED:
+          case UNKNOWN:
+            // read the row and decide what to do
+            break;
+          default:
+            throw new RuntimeException("Unexpected status " + status);
+        }
+
+        // EXCERCISE in the case of many threads trying to reserve a slot this 
approach of immediately retrying is inefficient. Exponential backoff is good
+        // general solution to solve contention problems like this. However in 
this particular case exponential backoff could penalize the earliest threads 
that
+        // attempted to make a reservation putting them later in the list. A 
more complex solution could involve having independent sub-queues within the row
+        // that approximately maintain arrival order and use exponential back 
off to fairly merge the sub-queues into the main queue.
+
+        // its important to use an isolated scanner so that only whole 
mutations are seen
+        Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, 
Authorizations.EMPTY));
+        scanner.setRange(new Range(row));
+
+        int seq = -1;
+        int maxReservation = -1;
+
+        for (Entry<Key,Value> entry : scanner) {
+          String cf = entry.getKey().getColumnFamilyData().toString();
+          String cq = entry.getKey().getColumnQualifierData().toString();
+          String val = entry.getValue().toString();
+
+          if (cf.equals("tx") && cq.equals("seq")) {
+            seq = Integer.parseInt(val);
+          } else if (cf.equals("res")) {
+            // EXCERCISE scanning the entire list to find if reserver is 
already in the list is inefficient. One possible way to solve this would be to 
sort the
+            // data differently in Accumulo so that finding the reserver could 
be done quickly.
+            if (val.equals(who))
+              if (maxReservation == -1)
+                return ReservationResult.RESERVED; // already have the first 
reservation
+              else
+                return ReservationResult.WAIT_LISTED; // already on wait list
+
+            // EXCERCISE the way this code finds the max reservation is very 
inefficient.... it would be better if it did not have to scan the entire row.
+            // One possibility is to just use the sequence number. Could also 
consider sorting the data in another way and/or using an iterator.
+            maxReservation = Integer.parseInt(cq);
+          }
+        }
+
+        Condition condition = new Condition("tx", "seq");
+        if (seq >= 0)
+          condition.setValue(seq + ""); // only expect a seq # if one was seen
+
+        update = new ConditionalMutation(row, condition);
+        update.put("tx", "seq", (seq + 1) + "");
+        update.put("res", String.format("%04d", maxReservation + 1), who);
+
+        // EXCERCISE if set capacity is implemented, then result should take 
capacity into account
+        if (maxReservation == -1)
+          result = ReservationResult.RESERVED; // if successful, will be first 
reservation
+        else
+          result = ReservationResult.WAIT_LISTED;
+      }
+    } finally {
+      cwriter.close();
+    }
+
+  }
+
+  public void cancel(String what, String when, String who) throws Exception {
+
+    String row = what + ":" + when;
+
+    // Even though this method is only deleting a column, its important to use 
a conditional writer. By updating the seq # when deleting a reservation it
+    // will cause any concurrent reservations to retry. If this delete were 
done using a batch writer, then a concurrent reservation could report 
WAIT_LISTED
+    // when it actually got the reservation.
+
+    ConditionalWriter cwriter = conn.createConditionalWriter(rTable, new 
ConditionalWriterConfig());
+
+    try {
+      while (true) {
+
+        // its important to use an isolated scanner so that only whole 
mutations are seen
+        Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, 
Authorizations.EMPTY));
+        scanner.setRange(new Range(row));
+
+        int seq = -1;
+        String reservation = null;
+
+        for (Entry<Key,Value> entry : scanner) {
+          String cf = entry.getKey().getColumnFamilyData().toString();
+          String cq = entry.getKey().getColumnQualifierData().toString();
+          String val = entry.getValue().toString();
+
+          // EXCERCISE avoid linear scan
+
+          if (cf.equals("tx") && cq.equals("seq")) {
+            seq = Integer.parseInt(val);
+          } else if (cf.equals("res") && val.equals(who)) {
+            reservation = cq;
+          }
+        }
+
+        if (reservation != null) {
+          ConditionalMutation update = new ConditionalMutation(row, new 
Condition("tx", "seq").setValue(seq + ""));
+          update.putDelete("res", reservation);
+          update.put("tx", "seq", (seq + 1) + "");
+
+          Status status = cwriter.write(update).getStatus();
+          switch (status) {
+            case ACCEPTED:
+              // successfully canceled reservation
+              return;
+            case REJECTED:
+            case UNKNOWN:
+              // retry
+              // EXCERCISE exponential backoff could be used here
+              break;
+            default:
+              throw new RuntimeException("Unexpected status " + status);
+          }
+
+        } else {
+          // not reserved, nothing to do
+          break;
+        }
+
+      }
+    } finally {
+      cwriter.close();
+    }
+  }
+
+  public List<String> list(String what, String when) throws Exception {
+    String row = what + ":" + when;
+    
+    //its important to use an isolated scanner so that only whole mutations 
are seen
+    Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, 
Authorizations.EMPTY));
+    scanner.setRange(new Range(row));
+    scanner.fetchColumnFamily(new Text("res"));
+
+    List<String> reservations = new ArrayList<String>();
+
+    for (Entry<Key,Value> entry : scanner) {
+      String val = entry.getValue().toString();
+      reservations.add(val);
+    }
+
+    return reservations;
+  }
+
+  public static void main(String[] args) throws Exception {
+    final ConsoleReader reader = new ConsoleReader();
+    ARS ars = null;
+
+    while (true) {
+      String line = reader.readLine(">");
+      if (line == null)
+        break;
+
+      final String[] tokens = line.split("\\s+");
+
+      if (tokens[0].equals("reserve") && tokens.length >= 4 && ars != null) {
+        // start up multiple threads all trying to reserve the same resource, 
no more than one should succeed
+
+        final ARS fars = ars;
+        ArrayList<Thread> threads = new ArrayList<Thread>();
+        for (int i = 3; i < tokens.length; i++) {
+          final int whoIndex = i;
+          Runnable reservationTask = new Runnable() {
+            @Override
+            public void run() {
+              try {
+                reader.println("  " + String.format("%20s", tokens[whoIndex]) 
+ " : " + fars.reserve(tokens[1], tokens[2], tokens[whoIndex]));
+              } catch (Exception e) {
+                e.printStackTrace();
+              }
+            }
+          };
+
+          threads.add(new Thread(reservationTask));
+        }
+
+        for (Thread thread : threads)
+          thread.start();
+
+        for (Thread thread : threads)
+          thread.join();
+
+      } else if (tokens[0].equals("cancel") && tokens.length == 4 && ars != 
null) {
+        ars.cancel(tokens[1], tokens[2], tokens[3]);
+      } else if (tokens[0].equals("list") && tokens.length == 3 && ars != 
null) {
+        List<String> reservations = ars.list(tokens[1], tokens[2]);
+        if (reservations.size() > 0) {
+          reader.println("  Reservation holder : " + reservations.get(0));
+          if (reservations.size() > 1)
+            reader.println("  Wait list : " + reservations.subList(1, 
reservations.size()));
+        }
+      } else if (tokens[0].equals("quit") && tokens.length == 1) {
+        break;
+      } else if (tokens[0].equals("connect") && tokens.length == 6 && ars == 
null) {
+        ZooKeeperInstance zki = new ZooKeeperInstance(tokens[1], tokens[2]);
+        Connector conn = zki.getConnector(tokens[3], new 
PasswordToken(tokens[4]));
+        if (conn.tableOperations().exists(tokens[5])) {
+          ars = new ARS(conn, tokens[5]);
+          reader.println("  connected");
+        } else
+          reader.println("  No Such Table");
+      } else {
+        System.out.println("  Commands : ");
+        if (ars == null) {
+          reader.println("    connect <instance> <zookeepers> <user> <pass> 
<table>");
+        } else {
+          reader.println("    reserve <what> <when> <who> {who}");
+          reader.println("    cancel <what> <when> <who>");
+          reader.println("    list <what> <when>");
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/73971825/server/monitor/src/main/resources/docs/examples/README.reservations
----------------------------------------------------------------------
diff --git 
a/server/monitor/src/main/resources/docs/examples/README.reservations 
b/server/monitor/src/main/resources/docs/examples/README.reservations
new file mode 100644
index 0000000..7ad9cd3
--- /dev/null
+++ b/server/monitor/src/main/resources/docs/examples/README.reservations
@@ -0,0 +1,66 @@
+Title: Apache Accumulo Isolation Example
+Notice:    Licensed to the Apache Software Foundation (ASF) under one
+           or more contributor license agreements.  See the NOTICE file
+           distributed with this work for additional information
+           regarding copyright ownership.  The ASF licenses this file
+           to you under the Apache License, Version 2.0 (the
+           "License"); you may not use this file except in compliance
+           with the License.  You may obtain a copy of the License at
+           .
+             http://www.apache.org/licenses/LICENSE-2.0
+           .
+           Unless required by applicable law or agreed to in writing,
+           software distributed under the License is distributed on an
+           "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+           KIND, either express or implied.  See the License for the
+           specific language governing permissions and limitations
+           under the License.
+
+This example shows running a simple reservation system implemented using
+conditional mutations.  This system garuntees that only one concurrent user can
+reserve a resource.  The example's reserve command allows multiple users to be
+specified.  When this is done it creates a separate reservation thread for each
+user.  In the example below threads are spun up for alice, bob, eve, mallory,
+and trent to reserve room06 on 20140101.  Bob ends up getting the reservation
+and everyone else is put on a wait list.  The example code will take any string
+for what, when and who.
+
+    $ ./bin/accumulo org.apache.accumulo.examples.simple.reservations.ARS
+    >connect test16 localhost root secret ars
+      connected
+    >
+      Commands : 
+        reserve <what> <when> <who> {who}
+        cancel <what> <when> <who>
+        list <what> <when>
+    >reserve room06 20140101 alice bob eve mallory trent
+                       bob : RESERVED
+                   mallory : WAIT_LISTED
+                     alice : WAIT_LISTED
+                     trent : WAIT_LISTED
+                       eve : WAIT_LISTED
+    >list room06 20140101
+      Reservation holder : bob
+      Wait list : [mallory, alice, trent, eve]
+    >cancel room06 20140101 alice
+    >cancel room06 20140101 bob
+    >list room06 20140101
+      Reservation holder : mallory
+      Wait list : [trent, eve]
+    >quit
+
+Scanning the table in the Accumulo shell after running the example shows the
+following:
+
+    root@test16> table ars
+    root@test16 ars> scan
+    room06:20140101 res:0001 []    mallory
+    room06:20140101 res:0003 []    trent
+    room06:20140101 res:0004 []    eve
+    room06:20140101 tx:seq []    6
+
+The tx:seq column is incremented for each update to the row allowing for
+detection of concurrent changes.  For an update to go through the sequence
+number must not have changed since the data was read.  If it does change then
+the conditional mutation will fail and the example code will retry.
+

Reply via email to