[GitHub] incubator-tephra pull request #74: TEPHRA-266 Identify log messages when mul...

2018-05-01 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/74#discussion_r185305932
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCache.java
 ---
@@ -184,4 +185,14 @@ private void refreshState() throws IOException {
   public TransactionVisibilityState getLatestState() {
 return latestState;
   }
+
+  protected void setId(@Nullable String id) {
+if (id != null) {
+  this.logPrefix = "[" + id + "] ";
+}
+  }
+
+  private String prefixLog(String message) {
--- End diff --

I was not aware of that. I think this is not logged very frequently, so 
it's ok.


---


[GitHub] incubator-tephra pull request #72: TEPHRA-270 TEPHRA-271 Transaction state c...

2018-05-01 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/72#discussion_r185303754
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCache.java
 ---
@@ -78,6 +78,7 @@ protected void startUp() throws Exception {
   protected void shutDown() throws Exception {
 if (refreshService != null) {
   refreshService.interrupt();
+  refreshService.join(1000);
--- End diff --

I was mistaken, thinking that it would throw if the timeout is exceeded. 
Anyway, in a shutdown sequence we should catch all exceptions and ensure to 
continue the shutdown. 


---


[GitHub] incubator-tephra pull request #72: TEPHRA-270 TEPHRA-271 Transaction state c...

2018-04-26 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/72#discussion_r184323707
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCache.java
 ---
@@ -78,6 +78,7 @@ protected void startUp() throws Exception {
   protected void shutDown() throws Exception {
 if (refreshService != null) {
   refreshService.interrupt();
+  refreshService.join(1000);
--- End diff --

this means, if the refreshService does not finish within a second, 
shutdown() will throw exception? And the remainder of the shutdown sequence is 
not performed? Is that intentional?


---


[GitHub] incubator-tephra pull request #74: TEPHRA-266 Identify log messages when mul...

2018-04-26 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/74#discussion_r184322363
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/coprocessor/TransactionStateCache.java
 ---
@@ -184,4 +185,14 @@ private void refreshState() throws IOException {
   public TransactionVisibilityState getLatestState() {
 return latestState;
   }
+
+  protected void setId(@Nullable String id) {
+if (id != null) {
+  this.logPrefix = "[" + id + "] ";
+}
+  }
+
+  private String prefixLog(String message) {
--- End diff --

not sure this is a very good idea. It means you are performing the string 
operations even when it is not being logged (for example, for debug messages). 
Better to add the logPrefix as an argument to the log message, such as:
```
LOG.debug("[{}] Latest transaction snapshot: {}", logPrefix, 
latestState.toString()));
``` 



---


[GitHub] incubator-tephra issue #71: (TEPHRA-275) Exclude failing test from Travis bu...

2018-03-24 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/71
  
I had to try a few options:

- -Dtest=*,!TransactionProcessorTest does not seem to work in travis (even 
though it worked fine on my Mac). It then skips all tests and fails because of 
that
- so I added an extra exclude rule in the pom file
- I could not figure out how to modify a single build in the matrix
- excluded the regular jdk7/hbase-1.4 build, and included it with an extra 
test exclude

All builds now pass and only the jdk7/hbase-1.4 build skips 
TransactionProcessorTest
 


---


[GitHub] incubator-tephra issue #71: (TEPHRA-275) Exclude failing test from Travis bu...

2018-03-23 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/71
  
@poornachandra you are right. Changed the commit message. 


---


[GitHub] incubator-tephra pull request #71: (TEPHRA-275) Exclude failing test from Tr...

2018-03-23 Thread anew
GitHub user anew opened a pull request:

https://github.com/apache/incubator-tephra/pull/71

(TEPHRA-275) Exclude failing test from Travis build 

…until fix is available in HBase

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/anew/incubator-tephra tephra-2750-exclude-test

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-tephra/pull/71.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #71


commit 8e17abd50c133822fa64a2a23fbb598762292057
Author: anew <anew@...>
Date:   2018-03-23T21:17:02Z

(TEPHRA-275) Exclude failing test from Travis build until fix is available 
in HBase




---


[GitHub] incubator-tephra issue #69: (TEPHRA-279) Make TransactionContext resilient t...

2018-03-23 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/69
  
One test failure, that was present before this. Will commit this now.


---


[GitHub] incubator-tephra issue #69: (TEPHRA-279) Make TransactionContext resilient t...

2018-03-23 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/69
  
rebased on current master - waiting for one more build before committing 
this. 


---


[GitHub] incubator-tephra issue #70: [TEPHRA-282] Split Travis build per compat modul...

2018-01-18 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/70
  
Thanks @apurtell !



---


[GitHub] incubator-tephra issue #70: [TEPHRA-282] Split Travis build per compat modul...

2018-01-18 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/70
  
@apurtell based on our discussion - what do you think of this approach?


---


[GitHub] incubator-tephra issue #70: [TEPHRA-282] Split Travis build per compat modul...

2018-01-17 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/70
  
So this worked, but triggered 20 builds: 
https://travis-ci.org/apache/incubator-tephra/builds/330008721. There is a lot 
of redundancy in these tests, because each of them builds and tests 
tephra-core. I am gonna try to consolidate this a little by combining:

- 0.96 and 0.98 
- 1.0 and 1.0-cdh 
- 1.1, 1.2, cdh-5.7 and cdh-5.8 (they all share compat-hbase-1.1-base)
- 1.3 and 1.4

That reduces it to 2x4 builds and hopefully each one will pass way below 
the 50 minute mark. 



---


[GitHub] incubator-tephra pull request #70: [TEPHRA-282] Split Travis build per compa...

2018-01-16 Thread anew
GitHub user anew opened a pull request:

https://github.com/apache/incubator-tephra/pull/70

[TEPHRA-282] Split Travis build per compat module



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/anew/incubator-tephra 
tephra-282-travis-timeout

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-tephra/pull/70.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #70


commit 047c18a81cb17cbf544b0cd45dc109780075e9d0
Author: anew <anew@...>
Date:   2018-01-16T18:46:23Z

[TEPHRA-282] Split Travis build per compat module




---


[GitHub] incubator-tephra pull request #69: (TEPHRA-279) Make TransactionContext resi...

2018-01-11 Thread anew
GitHub user anew opened a pull request:

https://github.com/apache/incubator-tephra/pull/69

(TEPHRA-279) Make TransactionContext resilient to exceptions from 
getTransactionAwareName()



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/anew/incubator-tephra 
tephra-279-tx-context-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-tephra/pull/69.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #69


commit 0e4f7c3dd42659f28b716690327c2adc406263c4
Author: anew <anew@...>
Date:   2018-01-12T05:29:10Z

(TEPHRA-279) Make TransactionContext resilient to exceptions from 
getTransactionAwareName()




---


[GitHub] incubator-tephra issue #68: TEPHRA-278 Support HBase 1.4

2018-01-11 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/68
  
Running this test locally, I can see the cause of the failure in the logs:
```
2018-01-11 15:10:52,501 - INFO  [TransactionStateCache 
STARTING:o.a.t.c.TransactionStateCache@113] - Failed to initialize 
TransactionStateCache due to: 
com.google.inject.ProvisionException: Guice provision errors:

1) Unknown HBase version: 1.4.0

1 error
at 
org.apache.tephra.util.HBaseVersionSpecificFactory.get(HBaseVersionSpecificFactory.java:60)
 ~[classes/:na]
at 
org.apache.tephra.util.ConfigurationFactory.(ConfigurationFactory.java:66)
 ~[classes/:na]
at 
org.apache.tephra.coprocessor.TransactionStateCache.getSnapshotConfiguration(TransactionStateCache.java:118)
 [classes/:na]
at 
org.apache.tephra.coprocessor.TransactionStateCache.tryInit(TransactionStateCache.java:99)
 [classes/:na]
at 
org.apache.tephra.coprocessor.TransactionStateCache.refreshState(TransactionStateCache.java:158)
 [classes/:na]
at 
org.apache.tephra.coprocessor.TransactionStateCache.startUp(TransactionStateCache.java:75)
 [classes/:na]
at 
com.google.common.util.concurrent.AbstractIdleService$1$1.run(AbstractIdleService.java:43)
 [guava-13.0.1.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_80]
```
So the fix would be to 
- add a case for 1.4 in HBaseVersionSpecificFactory.get(). 
- that will require also adding a getHBase14Classname() and implementing it 
in ConfigurationFactory. 
- rename the HBase13ConfigurationProvider in the 1.4 compat module to 
HBase14ConfigurationProvider (along with its test case). 

I tried this in my local build and the test then passes. 



---


[GitHub] incubator-tephra issue #68: TEPHRA-278 Support HBase 1.4

2018-01-11 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/68
  
Changes look good to me - but both Travis runs failed with the error:
```

testFamilyDeleteWithCompaction(org.apache.tephra.hbase.TransactionAwareHTableTest)
  Time elapsed: 69.08 sec  <<< FAILURE!
java.lang.AssertionError: Compaction should have removed the row
at org.junit.Assert.fail(Assert.java:88)
at org.junit.Assert.assertTrue(Assert.java:41)
at 
org.apache.tephra.hbase.TransactionAwareHTableTest.testFamilyDeleteWithCompaction(TransactionAwareHTableTest.java:491)
```
Since this PR does not change any of the coprocessor logic, I am not sure 
what would cause this. Do you know whether anything changed in HBase 1.4 that 
would change the behavior of compaction? 



---


[GitHub] incubator-tephra issue #68: TEPHRA-278 Support HBase 1.4

2018-01-10 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/68
  
@apurtell Is this a 1:1 copy of the hbase-1.3 compat module? Or is there an 
API change that requires code change in the tephra module? If 1.4 is API 
compatible with 1.3, we can simply add support for 1.4 by adding a rule to 
HBaseVersion.java and use the 1.3 compat module if the HBase version string  
starts with "1.4"


---


[GitHub] incubator-tephra issue #62: TEPHRA-245 Improve prune debug tool

2017-09-21 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/62
  
LGTM


---


[GitHub] incubator-tephra pull request #62: TEPHRA-245 Improve prune debug tool

2017-09-21 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/62#discussion_r140172443
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTool.java
 ---
@@ -204,16 +204,19 @@ public int compare(RegionPruneInfo o1, 
RegionPruneInfo o2) {
 return Long.compare(o1.getPruneUpperBound(), 
o2.getPruneUpperBound());
   }
 };
-Queue lowestPrunes =
+MinMaxPriorityQueue lowestPrunes =
   
MinMaxPriorityQueue.orderedBy(comparator).maximumSize(numRegions).create();
 
 for (RegionPruneInfo pruneInfo : regionPruneInfos) {
   lowestPrunes.add(new RegionPruneInfoPretty(pruneInfo));
 }
 
-TreeSet regionSet = new TreeSet<>(comparator);
-regionSet.addAll(lowestPrunes);
-return regionSet;
+List regions = new ArrayList<>(numRegions);
+RegionPruneInfoPretty e;
+while ((e = lowestPrunes.pollFirst()) != null) {
--- End diff --

isn't that the same as addAll?


---


[GitHub] incubator-tephra pull request #62: TEPHRA-245 Improve prune debug tool

2017-09-21 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/62#discussion_r140172193
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTool.java
 ---
@@ -165,10 +165,10 @@ public void destroy() throws IOException {
*/
   @Override
   @SuppressWarnings("WeakerAccess")
-  public SortedSet getIdleRegions(Integer 
numRegions, String time) throws IOException {
+  public List getIdleRegions(Integer numRegions, 
String time) throws IOException {
--- End diff --

so now the result won't be sorted? 

Another option is to keep the sorted set, but make the region name part of 
the comparable. So that if two have the same prune bound, they still have 
different sort keys but end up adjacent to each other in the result. That would 
be a sorted result.


---


[GitHub] incubator-tephra pull request #62: TEPHRA-245 Improve prune debug tool

2017-09-21 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/62#discussion_r140172872
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java
 ---
@@ -280,29 +282,36 @@ public void testIdleRegions() throws Exception {
 try (PrintWriter out = new PrintWriter(outputStream)) {
   // Get the list of regions that have the lowest prune upper bounds 
for the latest record time
   Long latestRecordTime = compactedRegions.asMap().lastKey();
-  SortedSet 
latestExpected =
-ImmutableSortedSet.copyOf(pruneUpperBoundComparator(), 
compactedRegions.get(latestRecordTime));
+  List 
latestExpected =
+Lists.newArrayList(compactedRegions.get(latestRecordTime));
   pruningDebug.execute(new String[]{"idle-regions", "-1"}, out);
   out.flush();
-  Assert.assertEquals(GSON.toJson(latestExpected), 
readOutputStream(outputStream));
+  assertEquals(latestExpected, readOutputStream(outputStream));
 
   // Same command with explicit time
   outputStream.reset();
   pruningDebug.execute(new String[]{"idle-regions", "-1", 
String.valueOf(latestRecordTime)}, out);
   out.flush();
-  Assert.assertEquals(GSON.toJson(latestExpected), 
readOutputStream(outputStream));
+  assertEquals(latestExpected, readOutputStream(outputStream));
 
   // Same command with relative time
   outputStream.reset();
   pruningDebug.execute(new String[]{"idle-regions", "-1", "now-2s"}, 
out);
   out.flush();
-  Assert.assertEquals(GSON.toJson(latestExpected), 
readOutputStream(outputStream));
+  assertEquals(latestExpected, readOutputStream(outputStream));
 
   // Same command with reduced number of regions
   outputStream.reset();
-  pruningDebug.execute(new String[]{"idle-regions", "2", 
String.valueOf(latestRecordTime)}, out);
+  int limit = 2;
+  pruningDebug.execute(new String[]{"idle-regions", 
String.valueOf(limit), String.valueOf(latestRecordTime)}, out);
   out.flush();
-  Assert.assertEquals(GSON.toJson(subset(latestExpected, 0, 2)), 
readOutputStream(outputStream));
+  List actualPruneInfos = 
GSON.fromJson(readOutputStream(outputStream), PRUNE_INFO_LIST_TYPE);
+  Set actualRegions = 
Sets.newHashSet(Iterables.transform(actualPruneInfos, PRUNE_INFO_TO_STRING));
+  // Just comparing sizes, since returned order can be different when 
prune upper bounds are the same
--- End diff --

you can make that deterministic, see my comment above


---


[GitHub] incubator-tephra issue #61: TEPHRA-263 Enforce TTL, regardless of any in-pro...

2017-09-21 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/61
  
Changes look the same to me in all modules. LGTM


---


[GitHub] incubator-tephra issue #62: TEPHRA-245 Improve prune debug tool

2017-09-20 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/62
  
these commits look good. Please apply to all compat modules. 


---


[GitHub] incubator-tephra issue #62: TEPHRA-245 Improve prune debug tool

2017-09-19 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/62
  
looks good. please apply this to the other compat modules. 


---


[GitHub] incubator-tephra issue #62: TEPHRA-245 Improve prune debug tool

2017-09-19 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/62
  
I trust you that you made the same changes in the other modules. LGTM


---


[GitHub] incubator-tephra pull request #62: TEPHRA-245 Improve prune debug tool

2017-09-19 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/62#discussion_r139811987
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
 ---
@@ -56,15 +59,18 @@
 public class InvalidListPruningDebug {
   private static final Logger LOG = 
LoggerFactory.getLogger(InvalidListPruningDebug.class);
   private static final Gson GSON = new Gson();
--- End diff --

Ideally there is a command line option to control that, but actually people 
can pipe it through python. OK to leave as is.


---


[GitHub] incubator-tephra pull request #62: TEPHRA-245 Improve prune debug tool

2017-09-19 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/62#discussion_r139744215
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebug.java
 ---
@@ -56,15 +59,18 @@
 public class InvalidListPruningDebug {
   private static final Logger LOG = 
LoggerFactory.getLogger(InvalidListPruningDebug.class);
   private static final Gson GSON = new Gson();
--- End diff --

maybe better to use a pretty-printing Gson?

```
new GsonBuilder().setPrettyPrinting().create();
```


---


[GitHub] incubator-tephra pull request #61: TEPHRA-263 Enforce TTL, regardless of any...

2017-09-16 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/61#discussion_r139287848
  
--- Diff: 
tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
 ---
@@ -290,44 +290,52 @@ private void runFilteringTest(TxFilterFactory 
txFilterFactory,
   @Test
   public void testTTLFiltering() throws Exception {
 Map<byte[], Long> ttls = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
-ttls.put(FAM, 10L);
-ttls.put(FAM2, 30L);
+ttls.put(FAM, 100L);
+ttls.put(FAM2, 300L);
 ttls.put(FAM3, 0L);
 
+// start a transaction to populate the in-progress list. It should not 
affect TTL calculations
+txManager.startShort();
--- End diff --

abort this tx at the end of the test?


---


[GitHub] incubator-tephra pull request #61: TEPHRA-263 Enforce TTL, regardless of any...

2017-09-16 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/61#discussion_r139287786
  
--- Diff: tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java ---
@@ -75,12 +75,9 @@ public static long getOldestVisibleTimestamp(Map<byte[], 
Long> ttlByFamily, Tran
* @return The oldest timestamp that will be visible for the given 
transaction and TTL configuration
*/
   public static long getOldestVisibleTimestamp(Map<byte[], Long> 
ttlByFamily, Transaction tx, boolean readNonTxnData) {
-if (readNonTxnData) {
-  long maxTTL = getMaxTTL(ttlByFamily);
-  return maxTTL < Long.MAX_VALUE ? System.currentTimeMillis() - maxTTL 
: 0;
-}
-
-return getOldestVisibleTimestamp(ttlByFamily, tx);
+long ttlFactor = readNonTxnData ? 1 : TxConstants.MAX_TX_PER_MS;
+long maxTTL = getMaxTTL(ttlByFamily);
+return maxTTL < Long.MAX_VALUE ? tx.getTransactionId() - maxTTL * 
ttlFactor : 0;
--- End diff --

this can be negative if a large TTL is used. 


---


[GitHub] incubator-tephra pull request #61: TEPHRA-263 Enforce TTL, regardless of any...

2017-09-16 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/61#discussion_r139287927
  
--- Diff: 
tephra-hbase-compat-1.0/src/test/java/org/apache/tephra/hbase/coprocessor/TransactionVisibilityFilterTest.java
 ---
@@ -290,44 +290,52 @@ private void runFilteringTest(TxFilterFactory 
txFilterFactory,
   @Test
   public void testTTLFiltering() throws Exception {
 Map<byte[], Long> ttls = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
-ttls.put(FAM, 10L);
-ttls.put(FAM2, 30L);
+ttls.put(FAM, 100L);
+ttls.put(FAM2, 300L);
 ttls.put(FAM3, 0L);
 
+// start a transaction to populate the in-progress list. It should not 
affect TTL calculations
+txManager.startShort();
+
+// commit a transaction, just to move the readPointer. Otherwise, the 
TransactionVisibilityFilter will filter
+// based upon the readPointer being smaller than the tested values, 
whereas we want to test the TTL filtering
+Transaction superShortTx = txManager.startShort();
--- End diff --

actually, you don't really need the txManager for this test. You can simply 
construct a transaction that has the desired properties. 


---


[GitHub] incubator-tephra pull request #53: (TEPHRA-243) Improve logging for slow log...

2017-09-13 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/53#discussion_r138759678
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java 
---
@@ -85,105 +100,145 @@ public long getTimestamp() {
 
   @Override
   public void append(TransactionEdit edit) throws IOException {
-long startTime = System.nanoTime();
-synchronized (this) {
-  ensureAvailable();
-
-  Entry entry = new Entry(new 
LongWritable(logSequence.getAndIncrement()), edit);
-
-  // add to pending edits
-  append(entry);
-}
-
-// wait for sync to complete
-sync();
-long durationMillis = (System.nanoTime() - startTime) / 100L;
-if (durationMillis > SLOW_APPEND_THRESHOLD) {
-  LOG.info("Slow append to log " + getName() + ", took " + 
durationMillis + " msec.");
-}
+append(Collections.singletonList(edit));
   }
 
   @Override
   public void append(List edits) throws IOException {
-long startTime = System.nanoTime();
-synchronized (this) {
-  ensureAvailable();
-
+if (closing) { // or closed, which implies closing
+  throw new IOException("Log " + getName() + " is closing or already 
closed, cannot append");
+}
+if (!initialized) {
+  init();
+}
+// synchronizing here ensures that elements in the queue are ordered 
by seq number
+synchronized (logSequence) {
   for (TransactionEdit edit : edits) {
-Entry entry = new Entry(new 
LongWritable(logSequence.getAndIncrement()), edit);
-
-// add to pending edits
-append(entry);
+pendingWrites.add(new Entry(new 
LongWritable(logSequence.getAndIncrement()), edit));
   }
 }
-
-// wait for sync to complete
+// try to sync all pending edits (competing for this with other 
threads)
 sync();
-long durationMillis = (System.nanoTime() - startTime) / 100L;
-if (durationMillis > SLOW_APPEND_THRESHOLD) {
-  LOG.info("Slow append to log " + getName() + ", took " + 
durationMillis + " msec.");
-}
   }
 
-  private void ensureAvailable() throws IOException {
-if (closed) {
-  throw new IOException("Log " + getName() + " is already closed, 
cannot append!");
-}
-if (!initialized) {
-  init();
+  /**
+   * Return all pending writes at the time the method is called, or null 
if no writes are pending.
+   *
+   * Note that after this method returns, there can be additional pending 
writes,
+   * added concurrently while the existing pending writes are removed.
+   */
+  @Nullable
+  private Entry[] getPendingWrites() {
+synchronized (this) {
+  if (pendingWrites.isEmpty()) {
+return null;
+  }
+  Entry[] entriesToSync = new Entry[pendingWrites.size()];
+  for (int i = 0; i < entriesToSync.length; i++) {
+entriesToSync[i] = pendingWrites.remove();
+  }
+  return entriesToSync;
 }
   }
 
-  /*
-   * Appends new writes to the pendingWrites. It is better to keep it in
-   * our own queue rather than writing it to the HDFS output stream because
-   * HDFSOutputStream.writeChunk is not lightweight at all.
+  /**
+   * When multiple threads try to log edits at the same time, they all 
will call (@link #append}
+   * followed by {@link #sync()}, concurrently. Hence, it can happen that 
multiple {@code append()}
+   * are followed by a single {@code sync}, or vice versa.
+   *
+   * We want to record the time and position of the first {@code append()} 
after a {@code sync()},
+   * then measure the time after the next {@code sync()}, and log a 
warning if it exceeds a threshold.
+   * Therefore this is called every time before we write the pending list 
out to the log writer.
+   *
+   * See {@link #stopTimer(TransactionLogWriter)}.
+   *
+   * @throws IOException if the position of the writer cannot be determined
*/
-  private void append(Entry e) throws IOException {
-pendingWrites.add(e);
+  private void startTimerIfNeeded(TransactionLogWriter writer, int 
entryCount) throws IOException {
+// no sync needed because this is only called within a sync block
+if (positionBeforeWrite == -1L) {
+  positionBeforeWrite = writer.getPosition();
+  countSinceLastSync = 0;
+  stopWatch.reset().start();
+}
+countSinceLastSync += entryCount;
   }
 
-  // Returns all currently pending writes. New writes
-  // will accumulat

[GitHub] incubator-tephra pull request #53: (TEPHRA-243) Improve logging for slow log...

2017-09-13 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/53#discussion_r138732890
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java 
---
@@ -134,57 +124,112 @@ private void ensureAvailable() throws IOException {
 }
   }
 
-  /*
-   * Appends new writes to the pendingWrites. It is better to keep it in
-   * our own queue rather than writing it to the HDFS output stream because
-   * HDFSOutputStream.writeChunk is not lightweight at all.
+  /**
+   * Return all pending writes at the time the method is called, or null 
if no writes are pending.
+   *
+   * Note that after this method returns, there can be additional pending 
writes,
+   * added concurrently while the existing pending writes are removed.
+   */
+  @Nullable
+  private Entry[] getPendingWrites() {
+synchronized (this) {
+  if (pendingWrites.isEmpty()) {
+return null;
+  }
+  Entry[] entriesToSync = new Entry[pendingWrites.size()];
+  for (int i = 0; i < entriesToSync.length; i++) {
+entriesToSync[i] = pendingWrites.remove();
+  }
+  return entriesToSync;
+}
+  }
+
+  /**
+   * When multiple threads try to log edits at the same time, they all 
will call (@link #append}
+   * followed by {@link #sync()}, concurrently. Hence, it can happen that 
multiple {@code append()}
+   * are followed by a single {@code sync}, or vice versa.
+   *
+   * We want to record the time and position of the first {@code append()} 
after a {@code sync()},
+   * then measure the time after the next {@code sync()}, and log a 
warning if it exceeds a threshold.
+   * Therefore this is called every time before we write the pending list 
out to the log writer.
+   *
+   * See {@link #stopTimerIfNeeded(TransactionLogWriter)}.
+   *
+   * @throws IOException if the position of the writer cannot be determined
*/
-  private void append(Entry e) throws IOException {
-pendingWrites.add(e);
+  private void startTimerIfNeeded(TransactionLogWriter writer, int 
entryCount) throws IOException {
+// no sync needed because this is only called within a sync block
+if (positionBeforeWrite == -1L) {
+  positionBeforeWrite = writer.getPosition();
+  countSinceLastSync = 0;
+  stopWatch.reset().start();
+}
+countSinceLastSync += entryCount;
   }
 
-  // Returns all currently pending writes. New writes
-  // will accumulate in a new list.
-  private List getPendingWrites() {
-synchronized (this) {
-  List save = this.pendingWrites;
-  this.pendingWrites = new LinkedList<>();
-  return save;
+  /**
+   * Called by a {@code sync()} after flushing to file system. Issues a 
warning if the write(s)+sync
+   * together exceed a threshold.
+   *
+   * See {@link #startTimerIfNeeded(TransactionLogWriter, int)}.
+   *
+   * @throws IOException if the position of the writer cannot be determined
+   */
+  private void stopTimerIfNeeded(TransactionLogWriter writer) throws 
IOException {
+// this method is only called by a thread if it actually called 
sync(), inside a sync block
+if (positionBeforeWrite != -1L) { // actually it should never be -1, 
but just in case
+  stopWatch.stop();
+  long elapsed = stopWatch.elapsedMillis();
+  if (elapsed >= slowAppendThreshold) {
+long currentPosition = writer.getPosition();
+long bytesWritten = currentPosition - positionBeforeWrite;
+LOG.info("Slow append to log {}, took {} ms for {} entr{} and {} 
bytes.",
+ getName(), elapsed, countSinceLastSync, 
countSinceLastSync == 1 ? "y" : "ies", bytesWritten);
+  }
 }
+positionBeforeWrite = -1L;
+countSinceLastSync = 0;
   }
 
   private void sync() throws IOException {
 // writes out pending entries to the HLog
-TransactionLogWriter tmpWriter = null;
 long latestSeq = 0;
 int entryCount = 0;
 synchronized (this) {
   if (closed) {
 return;
--- End diff --

It would not always be correct to fail, because the edits from the current 
thread may actually have been synced. 

The logic in close() (which is also synchronized) is that this only gets 
closed after syncing. So a sync has just happened. However, it seems that it is 
possible that append() is called concurrently and it might append new edits, so 
there is a possible race condition. 

I guess we need an extra flag named closing. Close would first set closing 
to true. Now all calls to append() will fail. Then it will s

[GitHub] incubator-tephra pull request #53: (TEPHRA-243) Improve logging for slow log...

2017-09-13 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/53#discussion_r138731295
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java 
---
@@ -85,44 +99,20 @@ public long getTimestamp() {
 
   @Override
   public void append(TransactionEdit edit) throws IOException {
-long startTime = System.nanoTime();
-synchronized (this) {
-  ensureAvailable();
-
-  Entry entry = new Entry(new 
LongWritable(logSequence.getAndIncrement()), edit);
-
-  // add to pending edits
-  append(entry);
-}
-
-// wait for sync to complete
-sync();
-long durationMillis = (System.nanoTime() - startTime) / 100L;
-if (durationMillis > SLOW_APPEND_THRESHOLD) {
-  LOG.info("Slow append to log " + getName() + ", took " + 
durationMillis + " msec.");
-}
+append(Collections.singletonList(edit));
   }
 
   @Override
   public void append(List edits) throws IOException {
-long startTime = System.nanoTime();
-synchronized (this) {
+// synchronizing here ensures that elements in the queue are ordered 
by seq number
+synchronized (logSequence) {
   ensureAvailable();
--- End diff --

Good point, actually it is only used in one place, so I will just inline it 
outside the synchronize.


---


[GitHub] incubator-tephra pull request #53: (TEPHRA-243) Improve logging for slow log...

2017-09-13 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/53#discussion_r138692966
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java 
---
@@ -48,21 +51,30 @@
   protected long timestamp;
   private volatile boolean initialized;
   private volatile boolean closed;
-  private AtomicLong syncedUpTo = new AtomicLong();
-  private List pendingWrites = Lists.newLinkedList();
+  private long writtenUpTo = 0L;
+  private volatile long syncedUpTo = 0L;
+  private final Queue pendingWrites = new ConcurrentLinkedQueue<>();
   private TransactionLogWriter writer;
 
-  public AbstractTransactionLog(long timestamp, MetricsCollector 
metricsCollector) {
+  private int countSinceLastSync = 0;
+  private long positionBeforeWrite = -1L;
+  private final Stopwatch stopWatch = new Stopwatch();
+
+  private final long slowAppendThreshold;
+
+  AbstractTransactionLog(long timestamp, MetricsCollector 
metricsCollector, Configuration conf) {
 this.timestamp = timestamp;
 this.metricsCollector = metricsCollector;
+this.slowAppendThreshold = 
conf.getLong(TxConstants.TransactionLog.CFG_SLOW_APPEND_THRESHOLD,
+
TxConstants.TransactionLog.DEFAULT_SLOW_APPEND_THRESHOLD);
   }
 
   /**
* Initializes the log file, opening a file writer.  Clients calling 
{@code init()} should ensure that they
* also call {@link HDFSTransactionLog#close()}.
* @throws java.io.IOException If an error is encountered initializing 
the file writer.
*/
-  public synchronized void init() throws IOException {
+  private synchronized void init() throws IOException {
--- End diff --

yes, I noticed that, too, that's why I made it private, but forgot to 
update the Javadoc


---


[GitHub] incubator-tephra issue #53: (TEPHRA-243) Improve logging for slow log append

2017-09-13 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/53
  
Thanks for t he review, I agree with everything but your last comment. I 
have pushed another commit to address these comments. 


---


[GitHub] incubator-tephra pull request #53: (TEPHRA-243) Improve logging for slow log...

2017-09-13 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/53#discussion_r138686433
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java 
---
@@ -48,21 +52,30 @@
   protected long timestamp;
   private volatile boolean initialized;
   private volatile boolean closed;
+  private AtomicLong writtenUpTo = new AtomicLong();
   private AtomicLong syncedUpTo = new AtomicLong();
--- End diff --

good suggestion, done


---


[GitHub] incubator-tephra pull request #53: (TEPHRA-243) Improve logging for slow log...

2017-09-13 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/53#discussion_r138684944
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/persist/AbstractTransactionLog.java 
---
@@ -165,26 +203,36 @@ private void sync() throws IOException {
   // prevent writer being dereferenced
   tmpWriter = writer;
 
-  List currentPending = getPendingWrites();
-  if (!currentPending.isEmpty()) {
-tmpWriter.commitMarker(currentPending.size());
-  }
-
-  // write out all accumulated entries to log.
-  for (Entry e : currentPending) {
-tmpWriter.append(e);
-entryCount++;
-latestSeq = Math.max(latestSeq, e.getKey().get());
+  Entry[] currentPending = getPendingWrites();
+  if (currentPending != null) {
+entryCount = currentPending.length;
+startTimerIfNeeded(tmpWriter, entryCount);
+tmpWriter.commitMarker(entryCount);
+for (Entry e : currentPending) {
+  tmpWriter.append(e);
+  latestSeq = Math.max(latestSeq, e.getKey().get());
--- End diff --

yes, actually that has always been the case, even before these changes...


---


[GitHub] incubator-tephra issue #47: [TEPHRA-240] Include conflicting key and client ...

2017-09-12 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/47
  
squashed and rebased on latest master.


---


[GitHub] incubator-tephra pull request #47: [TEPHRA-240] Include conflicting key and ...

2017-09-12 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/47#discussion_r138490048
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/TransactionManager.java ---
@@ -206,12 +210,19 @@ public TransactionManager(Configuration conf, 
@Nonnull TransactionStateStorage p
 // TODO: REMOVE WITH txnBackwardsCompatCheck()
 longTimeoutTolerance = conf.getLong("data.tx.long.timeout.tolerance", 
1);
 
-//
+ClientIdRetention retention = ClientIdRetention.valueOf(
--- End diff --

Hmmm. We don't do that for the other configurations... if any of the 
numeric properties cannot be parsed as a number, it also fails. I think it is a 
good idea to fail on invalid configuration, because if there is a configuration 
that is present, then that is most likely with the intention to change the 
default. If there is a typo or some other error, and we only log a warning, 
that warning is likely to go unnoticed and the system will behave in a way that 
was intended, and that will go undetected until it causes failures. 


---


[GitHub] incubator-tephra pull request #47: [TEPHRA-240] Include conflicting key and ...

2017-09-12 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/47#discussion_r138437903
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/TransactionManager.java ---
@@ -853,46 +867,45 @@ private void advanceWritePointer(long writePointer) {
 }
   }
 
-  public boolean canCommit(Transaction tx, Collection<byte[]> changeIds)
-throws TransactionNotInProgressException, TransactionSizeException {
+  public void canCommit(long txId, Collection<byte[]> changeIds)
+throws TransactionNotInProgressException, TransactionSizeException, 
TransactionConflictException {
 
 txMetricsCollector.rate("canCommit");
 Stopwatch timer = new Stopwatch().start();
-InProgressTx inProgressTx = inProgress.get(tx.getTransactionId());
+InProgressTx inProgressTx = inProgress.get(txId);
 if (inProgressTx == null) {
   synchronized (this) {
 // invalid transaction, either this has timed out and moved to 
invalid, or something else is wrong.
-if (invalidTxList.contains(tx.getTransactionId())) {
+if (invalidTxList.contains(txId)) {
   throw new TransactionNotInProgressException(
 String.format(
-  "canCommit() is called for transaction %d that is not in 
progress (it is known to be invalid)",
-  tx.getTransactionId()));
+  "canCommit() is called for transaction %d that is not in 
progress (it is known to be invalid)", txId));
 } else {
   throw new TransactionNotInProgressException(
-String.format("canCommit() is called for transaction %d that 
is not in progress", tx.getTransactionId()));
+String.format("canCommit() is called for transaction %d that 
is not in progress", txId));
 }
   }
 }
 
 Set set =
-  validateChangeSet(tx, changeIds, inProgressTx.clientId != null ? 
inProgressTx.clientId : DEFAULT_CLIENTID);
-
-if (hasConflicts(tx, set)) {
-  return false;
+  validateChangeSet(txId, changeIds, inProgressTx.clientId != null ? 
inProgressTx.clientId : DEFAULT_CLIENTID);
+for (byte[] change : changeIds) {
--- End diff --

Good catch. I forgot to remove this from existing code. 


---


[GitHub] incubator-tephra pull request #47: [TEPHRA-240] Include conflicting key and ...

2017-09-12 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/47#discussion_r138437669
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/TransactionSystemClient.java ---
@@ -89,21 +89,38 @@
*
* @param tx transaction to verify
* @param changeIds ids of changes made by transaction
-   * @return true if transaction can be committed otherwise false
-   * @throws TransactionSizeException if the size of the chgange set 
exceeds the allowed limit
-   * @throws TransactionNotInProgressException if the transaction is not 
in progress; most likely it has timed out.
+   *
+   * @throws TransactionSizeException if the size of the change set 
exceeds the allowed limit
+   * @throws TransactionConflictException if the change set has a conflict 
with an overlapping transaction
+   * @throws TransactionNotInProgressException if the transaction is not 
in progress; most likely it has timed out
*/
-  boolean canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds) 
throws TransactionFailureException;
+  void canCommitOrThrow(Transaction tx, Collection<byte[]> changeIds)
+throws TransactionNotInProgressException, 
TransactionConflictException, TransactionSizeException;
--- End diff --

Replaced for canCommitOrThrow() and commitOrThrow(). 


---


[GitHub] incubator-tephra pull request #47: [TEPHRA-240] Include conflicting key and ...

2017-09-12 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/47#discussion_r138436222
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/TransactionContext.java ---
@@ -311,25 +304,16 @@ private void persist() throws 
TransactionFailureException {
   }
 
   private void commit() throws TransactionFailureException {
-boolean commitSuccess = false;
 try {
-  commitSuccess = txClient.commit(currentTx);
-} catch (TransactionNotInProgressException e) {
-  String message = String.format("Transaction %d is not in progress.", 
currentTx.getTransactionId());
-  LOG.warn(message, e);
-  abort(new TransactionFailureException(message, e));
-  // abort will throw that exception
+  txClient.commitOrThrow(currentTx);
+} catch (TransactionNotInProgressException | 
TransactionConflictException e) {
--- End diff --

it does not make a difference, but may be more future-proof.


---


[GitHub] incubator-tephra pull request #47: [TEPHRA-240] Include conflicting key and ...

2017-09-12 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/47#discussion_r138435657
  
--- Diff: 
tephra-api/src/main/java/org/apache/tephra/TransactionConflictException.java ---
@@ -22,11 +22,50 @@
  * Thrown to indicate transaction conflict occurred when trying to commit 
a transaction.
  */
 public class TransactionConflictException extends 
TransactionFailureException {
+
+  /**
+   * @deprecated since 0.13-incubating. Use {@link 
#TransactionConflictException(long, String, String)} instead.
+   */
+  @Deprecated
   public TransactionConflictException(String message) {
 super(message);
+transactionId = null;
+conflictingChange = null;
+conflictingClient = null;
   }
 
+  /**
+   * @deprecated since 0.13-incubating. Use {@link 
#TransactionConflictException(long, String, String)} instead.
+   */
+  @Deprecated
   public TransactionConflictException(String message, Throwable cause) {
 super(message, cause);
+transactionId = null;
+conflictingChange = null;
+conflictingClient = null;
+  }
+
+  public TransactionConflictException(long transactionId, String 
conflictingChange, String conflictingClient) {
--- End diff --

ok


---


[GitHub] incubator-tephra pull request #56: (TEPHRA-258) Improve logging in thrift cl...

2017-09-11 Thread anew
GitHub user anew opened a pull request:

https://github.com/apache/incubator-tephra/pull/56

(TEPHRA-258) Improve logging in thrift client providers

title says it all. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/anew/incubator-tephra tephra-258

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-tephra/pull/56.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #56


commit d907d5a15dbc76329eefa4e40ad504ef3fe81036
Author: anew <a...@apache.org>
Date:   2017-09-12T01:15:30Z

(TEPHRA-258) Improve logging in thrift client providers




---


[GitHub] incubator-tephra pull request #55: TEPHRA-244 Remove regions of deleted tabl...

2017-09-10 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/55#discussion_r137981077
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
 ---
@@ -384,7 +384,67 @@ public void testPruneEmptyTable() throws Exception {
   hBaseAdmin.disableTable(txEmptyTable);
   hBaseAdmin.deleteTable(txEmptyTable);
 }
+  }
+
+  @Test
+  public void testPruneTransientTable() throws Exception {
+// Make sure that transient tables do not block the progress of pruning
+
+// Create a temp table
+TableName txTempTable = TableName.valueOf("tempTable");
+createTable(txTempTable.getName(), new byte[][]{family}, false,
+
Collections.singletonList(TestTransactionProcessor.class.getName()));
+
+TableName txDataTable2 = null;
+
+TransactionPruningPlugin transactionPruningPlugin = new 
TestTransactionPruningPlugin();
+transactionPruningPlugin.initialize(conf);
+
+try {
+  long now1 = System.currentTimeMillis();
+  long inactiveTxTimeNow1 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
+  long noPruneUpperBound = -1;
+  long expectedPruneUpperBound1 = (now1 - 200) * 
TxConstants.MAX_TX_PER_MS;
+  InMemoryTransactionStateCache.setTransactionSnapshot(
+new TransactionSnapshot(expectedPruneUpperBound1, 
expectedPruneUpperBound1, expectedPruneUpperBound1,
+ImmutableSet.of(expectedPruneUpperBound1),
+ImmutableSortedMap.<Long, 
TransactionManager.InProgressTx>of()));
+
+  // fetch prune upper bound, there should be no prune upper bound 
since nothing has been compacted yet.
+  // This run is only to store the initial set of regions
+  long pruneUpperBound1 = 
transactionPruningPlugin.fetchPruneUpperBound(now1, inactiveTxTimeNow1);
+  Assert.assertEquals(noPruneUpperBound, pruneUpperBound1);
+  transactionPruningPlugin.pruneComplete(now1, noPruneUpperBound);
+
+  // Now delete the transient table
+  hBaseAdmin.disableTable(txTempTable);
+  hBaseAdmin.deleteTable(txTempTable);
+
+  // Compact the data table now
+  testUtil.compact(txDataTable1, true);
+  // Since the write to prune table happens async, we need to sleep a 
bit before checking the state of the table
+  TimeUnit.SECONDS.sleep(2);
+
+  // Create a new table that will not be compacted
+  txDataTable2 = TableName.valueOf("invalidListPruneTestTable2");
+  createTable(txDataTable2.getName(), new byte[][]{family}, false,
+  
Collections.singletonList(TestTransactionProcessor.class.getName()));
+
+  // fetch prune upper bound, there should be a prune upper bound even 
though txTempTable does not exist anymore,
+  // and txDataTable2 has not been compacted/flushed yet
+  long now2 = System.currentTimeMillis();
+  long inactiveTxTimeNow2 = (now1 - 150) * TxConstants.MAX_TX_PER_MS;
+  long pruneUpperBound2 = 
transactionPruningPlugin.fetchPruneUpperBound(now2, inactiveTxTimeNow2);
+  Assert.assertEquals(expectedPruneUpperBound1, pruneUpperBound2);
+  transactionPruningPlugin.pruneComplete(now2, 
expectedPruneUpperBound1);
+} finally {
+  transactionPruningPlugin.destroy();
+  if (txDataTable2 != null) {
--- End diff --

you should also do this for txTempTable. The test case deletes it, but if 
it fails before that, the table never gets deleted.


---


[GitHub] incubator-tephra issue #48: [TEPHRA-241] Introduce a way to limit the size o...

2017-09-09 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/48
  
Same travis failure. This appears to happen a lot more frequently with Java 
8. I will commit this now and try to fix the flaky test before 0.13 release.


---


[GitHub] incubator-tephra pull request #54: wip

2017-09-09 Thread anew
GitHub user anew opened a pull request:

https://github.com/apache/incubator-tephra/pull/54

wip

on hold

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/anew/incubator-tephra tephra-253

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-tephra/pull/54.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #54


commit 140c1f0aa621fb7ab7a80c087b5be293d3b68035
Author: anew <a...@apache.org>
Date:   2017-09-09T06:43:55Z

wip




---


[GitHub] incubator-tephra pull request #53: (TEPHRA-243) Improve logging for slow log...

2017-09-08 Thread anew
GitHub user anew opened a pull request:

https://github.com/apache/incubator-tephra/pull/53

(TEPHRA-243) Improve logging for slow log append

- adds more detail to the log message (number of entries, bytes written)
- ensures only one thread logs the message concurrently, by moving it into 
the method actually writes to storage. Previously all threads logged it even if 
they did not write themselves 
- changed ThriftTransactionSystemTest to use LocalTransactionLog (otherwise 
it does not get tested at all)
- fixed a few unrelated compiler warning encountered on the way

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/anew/incubator-tephra tephra-243

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-tephra/pull/53.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #53


commit 60085beef1c5b8cef90b1b14751ae7aa78b6d909
Author: anew <a...@apache.org>
Date:   2017-09-09T03:04:38Z

(TEPHRA-243) Improve logging for slow log append




---


[GitHub] incubator-tephra pull request #52: (TEPHRA-250) Allow to trigger transaction...

2017-09-08 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/52#discussion_r137911192
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionServer.java
 ---
@@ -88,6 +88,8 @@
 
 public TTransaction checkpoint(TTransaction tx) throws 
TTransactionNotInProgressException, org.apache.thrift.TException;
--- End diff --

do not review this file, it is generated by Thrift. 


---


[GitHub] incubator-tephra pull request #52: (TEPHRA-250) Allow to trigger transaction...

2017-09-08 Thread anew
GitHub user anew opened a pull request:

https://github.com/apache/incubator-tephra/pull/52

(TEPHRA-250) Allow to trigger transaction pruning

- adds a method to TransactionPruningService to trigger pruning now
- exposes this method through Thrift and the Tx system client
- exposes the pruning service so that systems embedding Tephra can call the 
new method


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/anew/incubator-tephra 
tephra-250=trigger-pruning

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-tephra/pull/52.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #52


commit cdffa37676ca52e69869f6a513b2045e114b508c
Author: anew <a...@apache.org>
Date:   2017-09-08T23:36:29Z

(TEPHRA-250) Allow to trigger transaction pruning




---


[GitHub] incubator-tephra pull request #49: [TEPHRA-242] Ensure Pruning Service shuts...

2017-09-08 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/49#discussion_r137879378
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/distributed/TransactionService.java 
---
@@ -121,15 +121,15 @@ public void failed(State from, Throwable failure) {
   @Override
   public void follower() {
 ListenableFuture stopFuture = null;
+if (pruningService != null && pruningService.isRunning()) {
+  // Wait for pruning service to stop after un-registering from 
discovery
+  stopFuture = pruningService.stop();
+}
 // First stop the transaction server as un-registering from 
discovery can block sometimes.
 // That can lead to multiple transaction servers being active at 
the same time.
 if (server != null && server.isRunning()) {
   server.stopAndWait();
 }
-if (pruningService != null && pruningService.isRunning()) {
-  // Wait for pruning service to stop after un-registering from 
discovery
-  stopFuture = pruningService.stop();
-}
 undoRegister();
 
 if (stopFuture != null) {
--- End diff --

pruningService.shutdown() itself has a 30 second timeout. So this future is 
guaranteed to return, and we don't need a timeout here. 


---


[GitHub] incubator-tephra issue #50: [TEPHRA-238] Support for HBase 1.3.x

2017-09-06 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/50
  
The travis test run for Java 8 failed while it passed for Java 7. It fails 
in a test for HBase 1.2, which is not affected at all by this change. That 
indicates that the failing test case is flaky. I created a new Jira TEPHRA-253 
to track the flaky test. 

Since that failure is due to a flaky test, and it passed with Java 7, and 
the changes here do not have any influence on the failed test, I am confident 
that this change did not introduce the failure. Hence I will commit this.  


---


[GitHub] incubator-tephra issue #50: [TEPHRA-238] Support for HBase 1.3.x

2017-09-06 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/50
  
Thanks, @JamesRTaylor, for the verification. I am waiting for the Travis 
build to succeed. 
What is your anticipated date for Phoenix 4.12? We have a few critical 
fixes that we would like to get done before the release, so a release in about 
2 weeks seems realistic.  


---


[GitHub] incubator-tephra issue #50: [TEPHRA-238] Support for HBase 1.3.x

2017-09-06 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/50
  
Rebased on top of Tephra-252. Travis build should now pass. 


---


[GitHub] incubator-tephra pull request #51: [TEPHRA-252] Fix Travis build for Oracle ...

2017-09-06 Thread anew
Github user anew closed the pull request at:

https://github.com/apache/incubator-tephra/pull/51


---


[GitHub] incubator-tephra issue #51: [TEPHRA-252] Fix Travis build for Oracle JDK 7

2017-09-06 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/51
  
Travis build passed now. Will commit this. 


---


[GitHub] incubator-tephra issue #50: [TEPHRA-238] Support for HBase 1.3.x

2017-09-05 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/50
  
Thanks for the quick review, @JamesRTaylor. As an additional validation, 
could you run the Phoenix tests with this branch of Tephra? Thanks a lot!


---


[GitHub] incubator-tephra pull request #50: [TEPHRA-238] Support for HBase 1.3.x

2017-09-05 Thread anew
GitHub user anew opened a pull request:

https://github.com/apache/incubator-tephra/pull/50

[TEPHRA-238] Support for HBase 1.3.x

- Adds a new compat module using 1.3.1 as the HBase version.  I could not 
reuse the compat-1.1-base because HTable added four new methods in 1.3 
(get/setOperation/RpcTimeout)
- All the classes in the new src/ tree are the same as in the 
compat-1.1-base, except:
1. implemented the four new methods in TransactionAwareHTable
2. change to a TransactionProcessorTest.createRegion(), because 
WALFactory.getWAL() now takes an extra argument for the namespace. 
3. Changing 1.1 to 1.3 in all comments etc. 



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/anew/incubator-tephra tephra-238

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-tephra/pull/50.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #50


commit 75fb6df59ad6a040d46d8fccea0be51454b56d56
Author: anew <a...@apache.org>
Date:   2017-09-05T20:44:08Z

[TEPHRA-238] Support for HBase 1.3.x




---


[GitHub] incubator-tephra pull request #48: [TEPHRA-241] Introduce a way to limit the...

2017-08-31 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/48#discussion_r136458250
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/distributed/thrift/TTransactionServer.java
 ---
@@ -68,6 +68,8 @@
 
 public TBoolean canCommitTx(TTransaction tx, Set changes) 
throws TTransactionNotInProgressException, org.apache.thrift.TException;
 
+public TBoolean canCommitOrThrow(TTransaction tx, Set 
changes) throws TTransactionNotInProgressException, TGenericException, 
org.apache.thrift.TException;
--- End diff --

please do not review this file, it is generated by Thrift. Only need to 
review transaction.thrift


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #48: [TEPHRA-241] Introduce a way to limit the...

2017-08-31 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/48#discussion_r136457929
  
--- Diff: 
tephra-core/src/test/java/org/apache/tephra/TransactionManagerTest.java ---
@@ -54,115 +54,35 @@ protected TransactionStateStorage getStateStorage() {
 return txStateStorage;
   }
 
-  @Before
-  public void before() {
+  @BeforeClass
+  public static void beforeClass() {
+conf = getCommonConfiguration();
 conf.setInt(TxConstants.Manager.CFG_TX_CLEANUP_INTERVAL, 0); // no 
cleanup thread
-conf.setInt(TxConstants.Manager.CFG_TX_MAX_TIMEOUT, (int) 
TimeUnit.DAYS.toSeconds(5)); // very long limit
 // todo should create two sets of tests, one with 
LocalFileTxStateStorage and one with InMemoryTxStateStorage
 txStateStorage = new InMemoryTransactionStateStorage();
 txManager = new TransactionManager
   (conf, txStateStorage, new TxMetricsCollector());
 txManager.startAndWait();
   }
 
-  @After
-  public void after() {
+  @AfterClass
+  public static void afterClass() {
 txManager.stopAndWait();
   }
 
-  @Test
--- End diff --

this test moved to TransactionSystemTest without change.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #48: [TEPHRA-241] Introduce a way to limit the...

2017-08-31 Thread anew
GitHub user anew opened a pull request:

https://github.com/apache/incubator-tephra/pull/48

[TEPHRA-241] Introduce a way to limit the size of a transaction

- first commit introduces four new configurations for change set size 
limits, enforces these limits in a new method canCommitOrThrow() (we keep 
canCommit() for compatibility), and uses the new method throughout the higher 
level classes and tests
- second commit refactors the transaction system test cases: Don't start a 
new TxManager for each test method, have a shared common configuration across 
subclasses, move the checkpoint test to the base class such that it also gets 
tested with Thrift
- third commit adds a new test case for size limit validation

Thrift changes are backward-compatible (we keep the existing methods the 
same), and so are the TransactionSystemClient changes. 
 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/anew/incubator-tephra tephra-241

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-tephra/pull/48.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #48


commit 7227bcef076ccf762b78bf299003c373e7e73814
Author: anew <a...@apache.org>
Date:   2017-08-31T21:43:32Z

add new config properties and a new method canCommitOrThrow() that 
validates the size of change sets

commit 4ad0f924c34ad4aea35443c8cd10bd81cf24eeb8
Author: anew <a...@apache.org>
Date:   2017-08-31T21:43:56Z

refactor transaction system test

commit f3a1e786ae62f6bf05043c6fd770338361e615bc
Author: anew <a...@apache.org>
Date:   2017-08-31T21:44:32Z

add new test for change set size validation




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #47: [TEPHRA-240] Include conflicting key and ...

2017-08-17 Thread anew
GitHub user anew opened a pull request:

https://github.com/apache/incubator-tephra/pull/47

[TEPHRA-240] Include conflicting key and client id in 
TransactionConflictException

- Enhances TransactionConflictException to have fields for the transaction 
id, the conflicting change, and the conflicting client id
- TransactionManager to retain the client id for change sets after the 
transaction has committed
- A configuration for whether and how long it should be retained. This is 
to allow disabling this feature (and it effect on memory)
- TransactionManager.(can)Commit now throws TrandactionConflictException 
instead of returning a boolean
- new thrift methods (and a new thrift exception). The existing thrift 
methods will remain backward-compatible
- change TransactionSystemClient and TransactionContext to use the new 
methods
- new test case (and existing test case changes) for this
- a few unrelated warnings have been removed from TransactionManager
- a bit of test case refactoring to remove duplication of code

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/anew/incubator-tephra tephra-240

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-tephra/pull/47.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #47


commit 205714fdd126ab7537f9f66e6e156c9736f54646
Author: anew <a...@apache.org>
Date:   2017-08-17T23:21:52Z

[TEPHRA-240] Include conflicting key and client id in 
TransactionConflictException




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra issue #38: TEPHRA-224 Handle delay between transaction max ...

2017-02-22 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/38
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #38: TEPHRA-224 Handle delay between transacti...

2017-02-21 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/38#discussion_r102371828
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/txprune/TransactionPruningRunnable.java
 ---
@@ -57,8 +60,13 @@ public void run() {
   Transaction tx = txManager.startShort();
   txManager.abort(tx);
 
+  if (tx.getInvalids().length == 0) {
+LOG.info("Invalid list is empty, not running transaction pruning");
--- End diff --

so should we still prune here? It won't remove any invalid tx, but it will 
cleanup the prune state. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #38: TEPHRA-224 Handle delay between transacti...

2017-02-21 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/38#discussion_r102371884
  
--- Diff: tephra-core/src/main/java/org/apache/tephra/TxConstants.java ---
@@ -376,6 +376,11 @@
 public static final String PRUNE_FLUSH_INTERVAL = 
"data.tx.prune.flush.interval";
 
 /**
+ * The buffer in seconds used to pad transaction max lifetime while 
pruning.
+ */
+public static final String PRUNE_BUFFER_INTERVAL = 
"data.tx.prune.buffer";
--- End diff --

PRUNE_SAFETY_BUFFER? PRUNE_GRACE_PERIOD?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #38: TEPHRA-224 Handle delay between transacti...

2017-02-21 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/38#discussion_r102365159
  
--- Diff: tephra-core/src/main/java/org/apache/tephra/TxConstants.java ---
@@ -376,6 +376,11 @@
 public static final String PRUNE_FLUSH_INTERVAL = 
"data.tx.prune.flush.interval";
 
 /**
+ * The buffer in seconds used to pad transaction max lifetime while 
pruning.
+ */
+public static final String PRUNE_BUFFER_INTERVAL = 
"data.tx.prune.buffer";
--- End diff --

it's not really an interval, or is it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #37: TEPHRA-223 Encapsulate the two data struc...

2017-02-20 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/37#discussion_r102124958
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.manager;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.apache.tephra.TransactionManager;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * This is an internal class used by the {@link TransactionManager} to 
store invalid transaction ids.
+ * This class uses both a list and an array to keep track of the invalid 
ids. The list is the primary
+ * data structure for storing the invalid ids. The array is populated 
lazily on changes to the list.
+ * The array is used to avoid creating a new array every time method 
{@link #toArray()} is invoked.
+ *
+ * This class is not thread safe and relies on external synchronization. 
TransactionManager always
+ * accesses an instance of this class after synchronization.
+ */
+@NotThreadSafe
+public class InvalidTxList {
+  private static final long[] NO_INVALID_TX = { };
+
+  private final LongArrayList invalid = new LongArrayList();
+  private long[] invalidArray = NO_INVALID_TX;
+
+  private boolean dirty = false; // used to track changes to the invalid 
list
+
+  public int size() {
+return invalid.size();
+  }
+
+  public boolean isEmpty() {
+return invalid.isEmpty();
+  }
+
+  public boolean add(long id) {
+boolean changed = invalid.add(id);
+if (changed && !dirty) {
+  dirty = true;
+}
+return changed;
+  }
+
+  public boolean addAll(Collection ids) {
+boolean changed = invalid.addAll(ids);
+if (changed && !dirty) {
+  dirty = true;
+}
+return changed;
+  }
+
+  public boolean contains(long id) {
+return invalid.contains(id);
+  }
+
+  public boolean remove(long id) {
+boolean changed = invalid.rem(id);
+if (changed && !dirty) {
+  dirty = true;
+}
+return changed;
+  }
+
+  public boolean removeAll(Collection ids) {
+boolean changed = invalid.removeAll(ids);
+if (changed && !dirty) {
+  dirty = true;
+}
+return changed;
+  }
+
+  public void clear() {
+invalid.clear();
+invalidArray = NO_INVALID_TX;
+dirty = false;
+  }
+
+  public long[] toArray() {
+if (dirty) {
+  Collections.sort(invalid);
+  invalidArray = invalid.toLongArray();
+  dirty = false;
+}
+return invalidArray;
+  }
+
+  public List toList() {
+return Collections.unmodifiableList(invalid);
--- End diff --

Or the caller can remove the assumption. Sorting takes n log n. Whereas the 
caller traverses the list once, which is n. Not sure it it worth sorting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #37: TEPHRA-223 Encapsulate the two data struc...

2017-02-20 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/37#discussion_r102124522
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.manager;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.apache.tephra.TransactionManager;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * This is an internal class used by the {@link TransactionManager} to 
store invalid transaction ids.
+ * This class uses both a list and an array to keep track of the invalid 
ids. The list is the primary
+ * data structure for storing the invalid ids. The array is populated 
lazily on changes to the list.
+ * The array is used to avoid creating a new array every time method 
{@link #toArray()} is invoked.
+ *
+ * This class is not thread safe and relies on external synchronization. 
TransactionManager always
+ * accesses an instance of this class after synchronization.
+ */
+@NotThreadSafe
+public class InvalidTxList {
+  private static final long[] NO_INVALID_TX = { };
+
+  private final LongArrayList invalid = new LongArrayList();
+  private long[] invalidArray = NO_INVALID_TX;
+
+  private boolean dirty = false; // used to track changes to the invalid 
list
+
+  public int size() {
+return invalid.size();
+  }
+
+  public boolean isEmpty() {
+return invalid.isEmpty();
+  }
+
+  public boolean add(long id) {
+boolean changed = invalid.add(id);
+if (changed && !dirty) {
+  dirty = true;
+}
+return changed;
+  }
+
+  public boolean addAll(Collection ids) {
+boolean changed = invalid.addAll(ids);
+if (changed && !dirty) {
+  dirty = true;
+}
+return changed;
+  }
+
+  public boolean contains(long id) {
+return invalid.contains(id);
+  }
+
+  public boolean remove(long id) {
+boolean changed = invalid.rem(id);
+if (changed && !dirty) {
--- End diff --

ditto


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #37: TEPHRA-223 Encapsulate the two data struc...

2017-02-20 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/37#discussion_r102124470
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.manager;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.apache.tephra.TransactionManager;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * This is an internal class used by the {@link TransactionManager} to 
store invalid transaction ids.
+ * This class uses both a list and an array to keep track of the invalid 
ids. The list is the primary
+ * data structure for storing the invalid ids. The array is populated 
lazily on changes to the list.
+ * The array is used to avoid creating a new array every time method 
{@link #toArray()} is invoked.
+ *
+ * This class is not thread safe and relies on external synchronization. 
TransactionManager always
+ * accesses an instance of this class after synchronization.
+ */
+@NotThreadSafe
+public class InvalidTxList {
+  private static final long[] NO_INVALID_TX = { };
+
+  private final LongArrayList invalid = new LongArrayList();
+  private long[] invalidArray = NO_INVALID_TX;
+
+  private boolean dirty = false; // used to track changes to the invalid 
list
+
+  public int size() {
+return invalid.size();
+  }
+
+  public boolean isEmpty() {
+return invalid.isEmpty();
+  }
+
+  public boolean add(long id) {
+boolean changed = invalid.add(id);
+if (changed && !dirty) {
+  dirty = true;
+}
+return changed;
+  }
+
+  public boolean addAll(Collection ids) {
+boolean changed = invalid.addAll(ids);
+if (changed && !dirty) {
--- End diff --

ditto


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #37: TEPHRA-223 Encapsulate the two data struc...

2017-02-20 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/37#discussion_r102124543
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.manager;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.apache.tephra.TransactionManager;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * This is an internal class used by the {@link TransactionManager} to 
store invalid transaction ids.
+ * This class uses both a list and an array to keep track of the invalid 
ids. The list is the primary
+ * data structure for storing the invalid ids. The array is populated 
lazily on changes to the list.
+ * The array is used to avoid creating a new array every time method 
{@link #toArray()} is invoked.
+ *
+ * This class is not thread safe and relies on external synchronization. 
TransactionManager always
+ * accesses an instance of this class after synchronization.
+ */
+@NotThreadSafe
+public class InvalidTxList {
+  private static final long[] NO_INVALID_TX = { };
+
+  private final LongArrayList invalid = new LongArrayList();
+  private long[] invalidArray = NO_INVALID_TX;
+
+  private boolean dirty = false; // used to track changes to the invalid 
list
+
+  public int size() {
+return invalid.size();
+  }
+
+  public boolean isEmpty() {
+return invalid.isEmpty();
+  }
+
+  public boolean add(long id) {
+boolean changed = invalid.add(id);
+if (changed && !dirty) {
+  dirty = true;
+}
+return changed;
+  }
+
+  public boolean addAll(Collection ids) {
+boolean changed = invalid.addAll(ids);
+if (changed && !dirty) {
+  dirty = true;
+}
+return changed;
+  }
+
+  public boolean contains(long id) {
+return invalid.contains(id);
+  }
+
+  public boolean remove(long id) {
+boolean changed = invalid.rem(id);
+if (changed && !dirty) {
+  dirty = true;
+}
+return changed;
+  }
+
+  public boolean removeAll(Collection ids) {
+boolean changed = invalid.removeAll(ids);
+if (changed && !dirty) {
--- End diff --

ditto


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #37: TEPHRA-223 Encapsulate the two data struc...

2017-02-20 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/37#discussion_r102124449
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/manager/InvalidTxList.java ---
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.manager;
+
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import org.apache.tephra.TransactionManager;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * This is an internal class used by the {@link TransactionManager} to 
store invalid transaction ids.
+ * This class uses both a list and an array to keep track of the invalid 
ids. The list is the primary
+ * data structure for storing the invalid ids. The array is populated 
lazily on changes to the list.
+ * The array is used to avoid creating a new array every time method 
{@link #toArray()} is invoked.
+ *
+ * This class is not thread safe and relies on external synchronization. 
TransactionManager always
+ * accesses an instance of this class after synchronization.
+ */
+@NotThreadSafe
+public class InvalidTxList {
+  private static final long[] NO_INVALID_TX = { };
+
+  private final LongArrayList invalid = new LongArrayList();
+  private long[] invalidArray = NO_INVALID_TX;
+
+  private boolean dirty = false; // used to track changes to the invalid 
list
+
+  public int size() {
+return invalid.size();
+  }
+
+  public boolean isEmpty() {
+return invalid.isEmpty();
+  }
+
+  public boolean add(long id) {
+boolean changed = invalid.add(id);
+if (changed && !dirty) {
--- End diff --

dirty = dirty || changed 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra issue #34: TEPHRA-216 Handle empty transactional regions du...

2017-02-11 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/34
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #34: TEPHRA-216 Handle empty transactional reg...

2017-02-10 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/34#discussion_r100603640
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/HBaseTransactionPruningPlugin.java
 ---
@@ -288,23 +288,27 @@ private long computePruneUpperBound(TimeRegions 
timeRegions) throws IOException
   SortedSet<byte[]> transactionalRegions = timeRegions.getRegions();
   long time = timeRegions.getTime();
 
+  long inactiveTransactionBound = 
dataJanitorState.getInactiveTransactionBoundForTime(time);
+  LOG.debug("Got inactive transaction bound {}", 
inactiveTransactionBound);
+  // If inactiveTransactionBound is not recorded then that means the 
data is not complete for these regions
+  if (inactiveTransactionBound == -1) {
+if (LOG.isDebugEnabled()) {
--- End diff --

is this if necessary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #34: TEPHRA-216 Handle empty transactional reg...

2017-02-10 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/34#discussion_r100608042
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/txprune/DataJanitorState.java
 ---
@@ -366,6 +371,56 @@ public void 
deleteInactiveTransactionBoundsOnOrBeforeTime(long time) throws IOEx
 }
   }
 
+  // 
+  // --- Methods for empty regions at a given time ---
+  // 
+  // Key: 0x4
+  // Col 'e': 
+  // 
+
+  /**
+   * Save the given region as empty as of the given time.
+   *
+   * @param time time in milliseconds
+   * @param regionId region id
+   */
+  public void saveEmptyRegionForTime(long time, byte[] regionId) throws 
IOException {
+byte[] timeBytes = Bytes.toBytes(time);
+try (Table stateTable = stateTableSupplier.get()) {
+  Put put = new Put(makeEmptyRegionTimeKey(timeBytes, regionId));
+  put.addColumn(FAMILY, EMPTY_REGION_TIME_COL, EMPTY_BYTE_ARRAY);
--- End diff --

maybe use a value that is visible to the human eye? Just thinking if you 
debug and scan the table, a value of "X" is easier to spot than an empty value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra issue #33: Use getRegion instead of getRegionInfo for hbase...

2017-02-09 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/33
  
LGTM



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra issue #22: Transaction pruning service

2016-12-27 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/22
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #22: Transaction pruning service

2016-12-12 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/22#discussion_r91990644
  
--- Diff: tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java ---
@@ -163,8 +167,22 @@ public static long getPruneUpperBound(Transaction tx) {
 
 long maxInvalidTx =
   tx.getInvalids().length > 0 ? 
tx.getInvalids()[tx.getInvalids().length - 1] : Transaction.NO_TX_IN_PROGRESS;
+
+// An invalid transaction can be used up to its max lifetime for data 
writes, hence we cannot prune an invalid
+// transaction until it exhausts its max lifetime.
+long elapsedTime = currentTimeMillis - 
TxUtils.getTimestamp(maxInvalidTx);
+long invalidTxBound;
+if (elapsedTime > txMaxLifetimeMillis) {
+  // maxInvalidTx is past its max lifetime
+  invalidTxBound = maxInvalidTx;
+} else {
+  // Reduce the lifetime of maxInvalidTx, so that it cannot be used 
for any more writes
+  long remainingTime = txMaxLifetimeMillis - elapsedTime;
+  invalidTxBound = maxInvalidTx - remainingTime * 
TxConstants.MAX_TX_PER_MS - 1;
--- End diff --

agreed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #23: [Tephra-201] Store checkpoints in in-prog...

2016-12-08 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/23#discussion_r91471138
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/TransactionManager.java ---
@@ -340,48 +340,51 @@ public long getSleepMillis() {
 
   private void cleanupTimedOutTransactions() {
 List invalidEdits = null;
-this.logReadLock.lock();
-try {
-  synchronized (this) {
-if (!isRunning()) {
-  return;
-}
+synchronized (this) {
+  if (!isRunning()) {
+return;
+  }
 
-long currentTime = System.currentTimeMillis();
-List timedOut = Lists.newArrayList();
-for (Map.Entry<Long, InProgressTx> tx : inProgress.entrySet()) {
-  long expiration = tx.getValue().getExpiration();
-  if (expiration >= 0L && currentTime > expiration) {
-// timed out, remember tx id (can't remove while iterating 
over entries)
-timedOut.add(tx.getKey());
-LOG.info("Tx invalid list: added tx {} because of timeout", 
tx.getKey());
-  } else if (expiration < 0) {
-LOG.warn("Transaction {} has negative expiration time {}. 
Likely cause is the transaction was not " +
-   "migrated correctly, this transaction will be 
expired immediately",
- tx.getKey(), expiration);
-timedOut.add(tx.getKey());
-  }
+  long currentTime = System.currentTimeMillis();
+  Map<Long, InProgressType> timedOut = Maps.newHashMap();
+  for (Map.Entry<Long, InProgressTx> tx : inProgress.entrySet()) {
+long expiration = tx.getValue().getExpiration();
+if (expiration >= 0L && currentTime > expiration) {
+  // timed out, remember tx id (can't remove while iterating over 
entries)
+  timedOut.put(tx.getKey(), tx.getValue().getType());
+  LOG.info("Tx invalid list: added tx {} because of timeout", 
tx.getKey());
+} else if (expiration < 0) {
+  LOG.warn("Transaction {} has negative expiration time {}. Likely 
cause is the transaction was not " +
+ "migrated correctly, this transaction will be expired 
immediately",
+   tx.getKey(), expiration);
+  timedOut.put(tx.getKey(), InProgressType.LONG);
 }
-if (!timedOut.isEmpty()) {
+  }
+  if (!timedOut.isEmpty()) {
+logWriteLock.lock();
--- End diff --

agreed. ok. will revert


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #23: [Tephra-201] Store checkpoints in in-prog...

2016-12-08 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/23#discussion_r91468098
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/TransactionManager.java ---
@@ -348,27 +348,30 @@ private void cleanupTimedOutTransactions() {
 }
 
 long currentTime = System.currentTimeMillis();
-List timedOut = Lists.newArrayList();
+Map<Long, InProgressType> timedOut = Maps.newHashMap();
--- End diff --

you are right. I can change it here, but it is really not related to this 
Jira... will do anyway



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #23: [Tephra-201] Store checkpoints in in-prog...

2016-12-08 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/23#discussion_r91467770
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/TransactionManager.java ---
@@ -1319,20 +1344,60 @@ public void shutdown() {
   }
 
   /**
+   * Type of in-progress transaction.
+   */
+  public enum InProgressType {
+
+/**
+ * Short transactions detect conflicts during commit.
+ */
+SHORT(TransactionType.SHORT),
+
+/**
+ * Long running transactions do not detect conflicts during commit.
+ */
+LONG(TransactionType.LONG),
+
+/**
+ * Check-pointed transactions are recorded as in-progress.
+ */
+CHECKPOINT(null);
+
+private final TransactionType transactionType;
+
+InProgressType(TransactionType transactionType) {
+  this.transactionType = transactionType;
+}
+
+public static InProgressType of(TransactionType type) {
+  switch (type) {
+case SHORT: return SHORT;
+case LONG:  return LONG;
+default: throw new IllegalArgumentException("Unknown 
TransactionType " + type);
+  }
+}
+
+@Nullable
+public TransactionType getTransactionType() {
+  return transactionType;
+}
+  }
+
+  /**
* Represents some of the info on in-progress tx
*/
   public static final class InProgressTx {
 /** the oldest in progress tx at the time of this tx start */
 private final long visibilityUpperBound;
 private final long expiration;
-private final TransactionType type;
+private final InProgressType type;
 private LongArrayList checkpointWritePointers = new LongArrayList();
--- End diff --

not introduced by this PR, but fixed now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #23: [Tephra-201] Store checkpoints in in-prog...

2016-12-08 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/23#discussion_r91467537
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/TransactionManager.java ---
@@ -958,25 +983,31 @@ private void doAbort(long writePointer, long[] 
checkpointWritePointers, Transact
 // makes tx visible (assumes that all operations were rolled back)
 // remove from in-progress set, so that it does not get excluded in 
the future
 InProgressTx removed = inProgress.remove(writePointer);
+boolean isInvalid = false;
 if (removed == null) {
   // tx was not in progress! perhaps it timed out and is invalid? try 
to remove it there.
   if (invalid.rem(writePointer)) {
+isInvalid = true;
 // remove any invalidated checkpoint pointers
 // this will only be present if the parent write pointer was also 
invalidated
 if (checkpointWritePointers != null) {
-  for (int i = 0; i < checkpointWritePointers.length; i++) {
-invalid.rem(checkpointWritePointers[i]);
+  for (long checkpointWritePointer : checkpointWritePointers) {
+invalid.rem(checkpointWritePointer);
   }
 }
 invalidArray = invalid.toLongArray();
 LOG.info("Tx invalid list: removed aborted tx {}", writePointer);
-// removed a tx from excludes: must move read pointer
-moveReadPointerIfNeeded(writePointer);
   }
-} else {
-  // removed a tx from excludes: must move read pointer
-  moveReadPointerIfNeeded(writePointer);
 }
+if (!isInvalid) {
+  if (checkpointWritePointers != null) {
--- End diff --

good point. done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra issue #20: Compute global prune upper bound using compactio...

2016-12-03 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/20
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra issue #20: Compute global prune upper bound using compactio...

2016-12-03 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/20
  
I did another pass. Only a few more comments... 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-12-03 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r90766319
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java
 ---
@@ -179,15 +180,17 @@ public void pruneComplete(long time, long 
maxPrunedInvalid) throws IOException {
   return;
 }
 
-// Get regions for given time, so as to not delete them
+// Get regions for the given time, so as to not delete them. The prune 
upper bounds for regions are recorded
+// by TransactionProcessor and the deletion is done by this class. To 
avoid update/delete race condition,
+// we only delete stale regions.
--- End diff --

OK, the comment makes this clear that it is because of race condition. Not 
sure what kind of race conditions? Also, I believe my comment still holds that 
this means, as long as a region exists, we will never delete its entries here. 
Do we really have to keep a record of a region's existence for every single 
pruneInterval since the beginning of time? That can be a lot of records over 
time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-12-03 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r90759375
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
 ---
@@ -58,10 +110,248 @@ public long getPruneUpperBound(byte[] regionId) 
throws IOException {
 }
   }
 
+  /**
+   * Get latest prune upper bounds for given regions. This is a batch 
operation of method
+   * {@link #getPruneUpperBoundForRegion(byte[])}
+   *
+   * @param regions a set of regions
+   * @return a map containing region id and its latest prune upper bound 
value
+   * @throws IOException when not able to read the data from HBase
+   */
+  public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> 
regions) throws IOException {
+Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (regions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  resultMap.put(region, pruneUpperBoundRegion);
+}
+  }
+}
+  }
+  return resultMap;
+}
+  }
+
+  /**
+   * Delete all regions that are not in the given exclude set and whose 
prune upper bound is less than a given value.
+   * After the invalid list is pruned up to deletionPruneUpperBound, we do 
not need entries for regions that have
+   * prune upper bound less than deletionPruneUpperBound. We however limit 
the deletion to only regions that are
+   * no longer in existence (due to deletion, etc.), to avoid 
update/delete race conditions.
+   *
+   * @param deletionPruneUpperBound prune upper bound below which regions 
will be deleted
+   * @param excludeRegions set of regions that should not be deleted
+   * @throws IOException when not able to delete data in HBase
+   */
+  public void deleteRegionsWithPruneUpperBoundBefore(long 
deletionPruneUpperBound, SortedSet<byte[]> excludeRegions)
+throws IOException {
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (!excludeRegions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  if (pruneUpperBoundRegion < deletionPruneUpperBound) {
+stateTable.delete(new Delete(next.getRow()));
+  }
+}
+  }
+}
+  }
+}
+  }
+
+  // ---
+  // --- Methods for regions at a given time ---
+  // ---
+  // Key: 0x2
+  // Col 't': 
+  // ---
+
+  /**
+   * Persist the regions for the given time. {@link 
HBaseTransactionPruningPlugin} saves the set of
+   * transactional regions existing in the HBase instance periodically.
+   *
+   * @param time timestamp in milliseconds
+   * @param regions set of regions at the time
+   * @throws IOException when not able to persist the data to HBase
+   */
+  public void saveRegionsForTime(long time, Set<byte[]> regions) throws 
IOException {
+byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+try (Table stateTable = stateTableSupplier.get()) {
+  for (byte[] region : regions) {
+Put put = new Put(makeTimeRegionKey(timeBytes, region));
+put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
+stateTable.put(put);
+  }
+}
+  }
+
+  /**
+   * Return the set of regions saved for the time just be

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-12-03 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r90760015
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/HBaseTransactionPruningPlugin.java
 ---
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor.janitor;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.janitor.TransactionPruningPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Default implementation of the {@link TransactionPruningPlugin} for 
HBase.
+ *
+ * This plugin determines the prune upper bound for transactional HBase 
tables that use
+ * coprocessor {@link TransactionProcessor}.
+ *
+ * State storage:
+ *
+ * This plugin expects the TransactionProcessor to save the prune upper 
bound for invalid transactions
+ * after every major compaction of a region. Let's call this (region, 
prune upper bound).
+ * In addition, the plugin also persists the following information on a 
run at time t
+ * 
+ *   
+ * (t, set of regions): Set of transactional regions at time 
t.
+ * Transactional regions are regions of the tables that have the 
coprocessor TransactionProcessor
+ * attached to them.
+ *   
+ *   
+ * (t, prune upper bound): This is the smallest not in-progress 
transaction that
+ * will not have writes in any HBase regions that are created after 
time t.
+ * This value is determined by the Transaction Service based on the 
transaction state at time t
+ * and passed on to the plugin.
+ *   
+ * 
+ *
+ * Computing prune upper bound:
+ *
+ * In a typical HBase instance, there can be a constant change in the 
number of regions due to region creations,
+ * splits and merges. At any given time there can always be a region on 
which a major compaction has not been run.
+ * Since the prune upper bound will get recorded for a region only after a 
major compaction,
+ * using only the latest set of regions we may not be able to find the
+ * prune upper bounds for all the current regions. Hence we persist the 
set of regions that exist at that time
+ * of each run of the plugin, and use historical region set for time 
t, t - 1, etc.
+ * to determine the prune upper bound.
+ *
+ * From the regions saved at time t, t - 1, etc.,
+ * the plugin tries to find the latest (t, set of regions) where 
all regions have been major compacted,
+ * i.e, all regions have prune upper bound recorded in (region, prune 
upper bound).
+ * 
+ * If such a set is found for time t1, the prune upper bound 
returned by the plugin is the minimum of
+ * 
+ *   Prune upper bounds of regions in set (t1, set of 
regions)
+ *   Prune upper bound from (t1, prune upper bound)
+ * 
+ *
+ * 
+ * Above, when we find (t1, set of regions), there may a region 
that was created after time t1,
+ * but has a data write from an invalid transaction that is smaller t

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89939212
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DefaultDataJanitorPlugin.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor.janitor;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.janitor.DataJanitorPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Default implementation of the {@link DataJanitorPlugin} for HBase
+ */
+@SuppressWarnings("WeakerAccess")
+public class DefaultDataJanitorPlugin implements DataJanitorPlugin {
+  public static final Logger LOG = 
LoggerFactory.getLogger(DefaultDataJanitorPlugin.class);
+
+  protected Configuration conf;
+  protected Connection connection;
+  protected DataJanitorState dataJanitorState;
+
+  @Override
+  public void initialize(Configuration conf) throws IOException {
+this.conf = conf;
+this.connection = ConnectionFactory.createConnection(conf);
+
+final TableName stateTable = 
TableName.valueOf(conf.get(TxConstants.DataJanitor.PRUNE_STATE_TABLE,
+
TxConstants.DataJanitor.DEFAULT_PRUNE_STATE_TABLE));
+LOG.info("Initializing plugin with state table {}", 
stateTable.getNameWithNamespaceInclAsString());
+this.dataJanitorState = new DataJanitorState(new 
DataJanitorState.TableSupplier() {
+  @Override
+  public Table get() throws IOException {
+return connection.getTable(stateTable);
+  }
+});
+  }
+
+  @Override
+  public long fetchPruneUpperBound(long time, long pruneUpperBoundForTime) 
throws IOException {
+LOG.debug("Fetching prune upper bound for time {} and max prune upper 
bound {}", time, pruneUpperBoundForTime);
+if (time < 0 || pruneUpperBoundForTime < 0) {
+  return -1;
+}
+
+SortedSet<byte[]> transactionalRegions = getTransactionalRegions();
+if (!transactionalRegions.isEmpty()) {
+  LOG.debug("Saving {} transactional regions for time {}", 
transactionalRegions.size(), time);
+  dataJanitorState.saveRegionsForTime(time, transactionalRegions);
+  // Save prune upper bound for time as the final step.
+  // We can then use its existence to make sure that the data for a 
given time is complete or not
+  LOG.debug("Saving max prune upper bound {} for time {}", 
pruneUpperBoundForTime, time);
+  dataJanitorState.savePruneUpperBoundForTime(time, 
pruneUpperBoundForTime);
+}
+
+return computePruneUpperBound(new TimeRegions(time, 
transactionalRegions));
+  }
+
+  @Override
+  public void pruneComplete(long time, long pruneUpperBound) throws 
IOException {
+LOG.debug("Prune complete for time {} and prune upper bound {}", time, 
pruneUpperB

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89938230
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/janitor/DataJanitorPlugin.java ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.janitor;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+/**
+ * Data janitor interface to manage the invalid transaction list.
+ * There will be one such plugin per data store that will be invoked 
periodically
+ * to fetch the prune upper bound for each data store.
+ * Invalid transaction list will pruned up to the minimum of prune upper 
bounds returned by all the plugins.
+ */
+public interface DataJanitorPlugin {
+  /**
+   * Called once at the beginning to initialize the plugin
+   */
+  void initialize(Configuration conf) throws IOException;
--- End diff --

at the beginning of what? The lifetime of a transaction manager? Or the 
beginning of a prune operation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89938297
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/janitor/DataJanitorPlugin.java ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.janitor;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+/**
+ * Data janitor interface to manage the invalid transaction list.
+ * There will be one such plugin per data store that will be invoked 
periodically
--- End diff --

It would make sense to mention here that this a plugin for the tx manager. 
Its name is a little misleading because the DataJanitor is a coprocessor that 
runs in each region server. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89938407
  
--- Diff: 
tephra-core/src/main/java/org/apache/tephra/janitor/DataJanitorPlugin.java ---
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.janitor;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+/**
+ * Data janitor interface to manage the invalid transaction list.
+ * There will be one such plugin per data store that will be invoked 
periodically
+ * to fetch the prune upper bound for each data store.
+ * Invalid transaction list will pruned up to the minimum of prune upper 
bounds returned by all the plugins.
+ */
+public interface DataJanitorPlugin {
+  /**
+   * Called once at the beginning to initialize the plugin
+   */
+  void initialize(Configuration conf) throws IOException;
+
+  /**
+   * Called periodically to fetch prune upper bound for a data store
+   *
+   * @param time start time of this prune iteration
+   * @param pruneUpperBoundForTime upper bound for prune tx id for the 
given start time
--- End diff --

this is confusing. I thought this _returns_ the prune upper bound? Why does 
it need the upper bound passed in as a parameter. I think you need to explain 
better what this parameter means. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89938969
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DefaultDataJanitorPlugin.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tephra.hbase.coprocessor.janitor;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.janitor.DataJanitorPlugin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+/**
+ * Default implementation of the {@link DataJanitorPlugin} for HBase
+ */
+@SuppressWarnings("WeakerAccess")
+public class DefaultDataJanitorPlugin implements DataJanitorPlugin {
+  public static final Logger LOG = 
LoggerFactory.getLogger(DefaultDataJanitorPlugin.class);
+
+  protected Configuration conf;
+  protected Connection connection;
+  protected DataJanitorState dataJanitorState;
+
+  @Override
+  public void initialize(Configuration conf) throws IOException {
+this.conf = conf;
+this.connection = ConnectionFactory.createConnection(conf);
+
+final TableName stateTable = 
TableName.valueOf(conf.get(TxConstants.DataJanitor.PRUNE_STATE_TABLE,
+
TxConstants.DataJanitor.DEFAULT_PRUNE_STATE_TABLE));
+LOG.info("Initializing plugin with state table {}", 
stateTable.getNameWithNamespaceInclAsString());
+this.dataJanitorState = new DataJanitorState(new 
DataJanitorState.TableSupplier() {
+  @Override
+  public Table get() throws IOException {
+return connection.getTable(stateTable);
+  }
+});
+  }
+
+  @Override
+  public long fetchPruneUpperBound(long time, long pruneUpperBoundForTime) 
throws IOException {
+LOG.debug("Fetching prune upper bound for time {} and max prune upper 
bound {}", time, pruneUpperBoundForTime);
+if (time < 0 || pruneUpperBoundForTime < 0) {
+  return -1;
+}
+
+SortedSet<byte[]> transactionalRegions = getTransactionalRegions();
+if (!transactionalRegions.isEmpty()) {
+  LOG.debug("Saving {} transactional regions for time {}", 
transactionalRegions.size(), time);
+  dataJanitorState.saveRegionsForTime(time, transactionalRegions);
--- End diff --

I am not sure I understand. Why does fetch() have to save anything? 
Shouldn't it just read (=fetch)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89207961
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
 ---
@@ -58,10 +105,237 @@ public long getPruneUpperBound(byte[] regionId) 
throws IOException {
 }
   }
 
+  /**
+   * Get latest prune upper bounds for given regions
+   *
+   * @param regions a set of regions
+   * @return a map containing region id and its latest prune upper bound 
value
+   * @throws IOException when not able to read the data from HBase
+   */
+  public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> 
regions) throws IOException {
+Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (regions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  resultMap.put(region, pruneUpperBoundRegion);
+}
+  }
+}
+  }
+  return resultMap;
+}
+  }
+
+  /**
+   * Delete all regions that are not in the given exclude set and whose 
prune upper bound is less than a given value
--- End diff --

what is this used for? Might be useful to say that in the javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89377823
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
 ---
@@ -58,10 +105,237 @@ public long getPruneUpperBound(byte[] regionId) 
throws IOException {
 }
   }
 
+  /**
+   * Get latest prune upper bounds for given regions
+   *
+   * @param regions a set of regions
+   * @return a map containing region id and its latest prune upper bound 
value
+   * @throws IOException when not able to read the data from HBase
+   */
+  public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> 
regions) throws IOException {
+Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (regions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  resultMap.put(region, pruneUpperBoundRegion);
+}
+  }
+}
+  }
+  return resultMap;
+}
+  }
+
+  /**
+   * Delete all regions that are not in the given exclude set and whose 
prune upper bound is less than a given value
+   *
+   * @param deletionPruneUpperBound prune upper bound below which regions 
will be deleted
+   * @param excludeRegions set of regions that should not be deleted
+   * @throws IOException when not able to delete data in HBase
+   */
+  public void deleteRegionsWithPruneUpperBoundBefore(long 
deletionPruneUpperBound, SortedSet<byte[]> excludeRegions)
+throws IOException {
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (!excludeRegions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  if (pruneUpperBoundRegion < deletionPruneUpperBound) {
+stateTable.delete(new Delete(next.getRow()));
+  }
+}
+  }
+}
+  }
+}
+  }
+
+  // ---
+  // --- Methods for regions at a given time ---
+  // Key: 0x2
+  // Col 't': 
+  // ---
+
+  /**
+   * Persist the regions for a given time
+   *
+   * @param time timestamp in milliseconds
+   * @param regions set of regions at the time
+   * @throws IOException when not able to persist the data to HBase
+   */
+  public void saveRegionsForTime(long time, Set<byte[]> regions) throws 
IOException {
+byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+try (Table stateTable = stateTableSupplier.get()) {
+  for (byte[] region : regions) {
+Put put = new Put(makeTimeRegionKey(timeBytes, region));
+put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
+stateTable.put(put);
+  }
+}
+  }
+
+  /**
+   * Return all the persisted regions for a time equal to or less than the 
given time
+   *
--- End diff --

If I read the code correctly, then this finds the greatest time that is 
less than the given time, and then return all regions with that exact time, but 
none that are older than that. Is that correct? The javadoc is not that clear 
about it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89391984
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
 ---
@@ -58,10 +105,237 @@ public long getPruneUpperBound(byte[] regionId) 
throws IOException {
 }
   }
 
+  /**
+   * Get latest prune upper bounds for given regions
+   *
+   * @param regions a set of regions
+   * @return a map containing region id and its latest prune upper bound 
value
+   * @throws IOException when not able to read the data from HBase
+   */
+  public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> 
regions) throws IOException {
+Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (regions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  resultMap.put(region, pruneUpperBoundRegion);
+}
+  }
+}
+  }
+  return resultMap;
+}
+  }
+
+  /**
+   * Delete all regions that are not in the given exclude set and whose 
prune upper bound is less than a given value
+   *
+   * @param deletionPruneUpperBound prune upper bound below which regions 
will be deleted
+   * @param excludeRegions set of regions that should not be deleted
+   * @throws IOException when not able to delete data in HBase
+   */
+  public void deleteRegionsWithPruneUpperBoundBefore(long 
deletionPruneUpperBound, SortedSet<byte[]> excludeRegions)
+throws IOException {
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (!excludeRegions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  if (pruneUpperBoundRegion < deletionPruneUpperBound) {
+stateTable.delete(new Delete(next.getRow()));
+  }
+}
+  }
+}
+  }
+}
+  }
+
+  // ---
+  // --- Methods for regions at a given time ---
+  // Key: 0x2
+  // Col 't': 
+  // ---
+
+  /**
+   * Persist the regions for a given time
+   *
+   * @param time timestamp in milliseconds
+   * @param regions set of regions at the time
+   * @throws IOException when not able to persist the data to HBase
+   */
+  public void saveRegionsForTime(long time, Set<byte[]> regions) throws 
IOException {
+byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+try (Table stateTable = stateTableSupplier.get()) {
+  for (byte[] region : regions) {
+Put put = new Put(makeTimeRegionKey(timeBytes, region));
+put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
+stateTable.put(put);
+  }
+}
+  }
+
+  /**
+   * Return all the persisted regions for a time equal to or less than the 
given time
+   *
+   * @param time timestamp in milliseconds
+   * @return set of regions and time at which they were recorded
+   * @throws IOException when not able to read the data from HBase
+   */
+  @Nullable
+  public TimeRegions getRegionsOnOrBeforeTime(long time) throws 
IOException {
+byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+try (Table stateTable = stateTableSupplier.get()) {
+  Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), 
REGION_TIME_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, REGION_TIME_COL);
+
   

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89391878
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
 ---
@@ -58,10 +105,237 @@ public long getPruneUpperBound(byte[] regionId) 
throws IOException {
 }
   }
 
+  /**
+   * Get latest prune upper bounds for given regions
+   *
+   * @param regions a set of regions
+   * @return a map containing region id and its latest prune upper bound 
value
+   * @throws IOException when not able to read the data from HBase
+   */
+  public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> 
regions) throws IOException {
+Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (regions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  resultMap.put(region, pruneUpperBoundRegion);
+}
+  }
+}
+  }
+  return resultMap;
+}
+  }
+
+  /**
+   * Delete all regions that are not in the given exclude set and whose 
prune upper bound is less than a given value
+   *
+   * @param deletionPruneUpperBound prune upper bound below which regions 
will be deleted
+   * @param excludeRegions set of regions that should not be deleted
+   * @throws IOException when not able to delete data in HBase
+   */
+  public void deleteRegionsWithPruneUpperBoundBefore(long 
deletionPruneUpperBound, SortedSet<byte[]> excludeRegions)
+throws IOException {
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (!excludeRegions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  if (pruneUpperBoundRegion < deletionPruneUpperBound) {
+stateTable.delete(new Delete(next.getRow()));
+  }
+}
+  }
+}
+  }
+}
+  }
+
+  // ---
+  // --- Methods for regions at a given time ---
+  // Key: 0x2
+  // Col 't': 
+  // ---
+
+  /**
+   * Persist the regions for a given time
+   *
+   * @param time timestamp in milliseconds
+   * @param regions set of regions at the time
+   * @throws IOException when not able to persist the data to HBase
+   */
+  public void saveRegionsForTime(long time, Set<byte[]> regions) throws 
IOException {
+byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+try (Table stateTable = stateTableSupplier.get()) {
+  for (byte[] region : regions) {
+Put put = new Put(makeTimeRegionKey(timeBytes, region));
+put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
+stateTable.put(put);
+  }
+}
+  }
+
+  /**
+   * Return all the persisted regions for a time equal to or less than the 
given time
+   *
+   * @param time timestamp in milliseconds
+   * @return set of regions and time at which they were recorded
+   * @throws IOException when not able to read the data from HBase
+   */
+  @Nullable
+  public TimeRegions getRegionsOnOrBeforeTime(long time) throws 
IOException {
+byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+try (Table stateTable = stateTableSupplier.get()) {
+  Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), 
REGION_TIME_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, REGION_TIME_COL);
+
   

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89207839
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
 ---
@@ -58,10 +105,237 @@ public long getPruneUpperBound(byte[] regionId) 
throws IOException {
 }
   }
 
+  /**
+   * Get latest prune upper bounds for given regions
+   *
+   * @param regions a set of regions
+   * @return a map containing region id and its latest prune upper bound 
value
+   * @throws IOException when not able to read the data from HBase
+   */
+  public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> 
regions) throws IOException {
+Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (regions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  resultMap.put(region, pruneUpperBoundRegion);
+}
+  }
+}
+  }
+  return resultMap;
+}
+  }
+
+  /**
+   * Delete all regions that are not in the given exclude set and whose 
prune upper bound is less than a given value
--- End diff --

period.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89391681
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
 ---
@@ -58,10 +105,237 @@ public long getPruneUpperBound(byte[] regionId) 
throws IOException {
 }
   }
 
+  /**
+   * Get latest prune upper bounds for given regions
+   *
+   * @param regions a set of regions
+   * @return a map containing region id and its latest prune upper bound 
value
+   * @throws IOException when not able to read the data from HBase
+   */
+  public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> 
regions) throws IOException {
+Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (regions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  resultMap.put(region, pruneUpperBoundRegion);
+}
+  }
+}
+  }
+  return resultMap;
+}
+  }
+
+  /**
+   * Delete all regions that are not in the given exclude set and whose 
prune upper bound is less than a given value
+   *
+   * @param deletionPruneUpperBound prune upper bound below which regions 
will be deleted
+   * @param excludeRegions set of regions that should not be deleted
+   * @throws IOException when not able to delete data in HBase
+   */
+  public void deleteRegionsWithPruneUpperBoundBefore(long 
deletionPruneUpperBound, SortedSet<byte[]> excludeRegions)
+throws IOException {
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (!excludeRegions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  if (pruneUpperBoundRegion < deletionPruneUpperBound) {
+stateTable.delete(new Delete(next.getRow()));
+  }
+}
+  }
+}
+  }
+}
+  }
+
+  // ---
+  // --- Methods for regions at a given time ---
+  // Key: 0x2
+  // Col 't': 
+  // ---
+
+  /**
+   * Persist the regions for a given time
+   *
+   * @param time timestamp in milliseconds
+   * @param regions set of regions at the time
+   * @throws IOException when not able to persist the data to HBase
+   */
+  public void saveRegionsForTime(long time, Set<byte[]> regions) throws 
IOException {
+byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+try (Table stateTable = stateTableSupplier.get()) {
+  for (byte[] region : regions) {
+Put put = new Put(makeTimeRegionKey(timeBytes, region));
+put.addColumn(FAMILY, REGION_TIME_COL, EMPTY_BYTE_ARRAY);
+stateTable.put(put);
+  }
+}
+  }
+
+  /**
+   * Return all the persisted regions for a time equal to or less than the 
given time
+   *
+   * @param time timestamp in milliseconds
+   * @return set of regions and time at which they were recorded
+   * @throws IOException when not able to read the data from HBase
+   */
+  @Nullable
+  public TimeRegions getRegionsOnOrBeforeTime(long time) throws 
IOException {
+byte[] timeBytes = Bytes.toBytes(getInvertedTime(time));
+try (Table stateTable = stateTableSupplier.get()) {
+  Scan scan = new Scan(makeTimeRegionKey(timeBytes, EMPTY_BYTE_ARRAY), 
REGION_TIME_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, REGION_TIME_COL);
+
   

[GitHub] incubator-tephra pull request #20: Compute global prune upper bound using co...

2016-11-28 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/20#discussion_r89938828
  
--- Diff: 
tephra-hbase-compat-1.1-base/src/main/java/org/apache/tephra/hbase/coprocessor/janitor/DataJanitorState.java
 ---
@@ -58,10 +105,237 @@ public long getPruneUpperBound(byte[] regionId) 
throws IOException {
 }
   }
 
+  /**
+   * Get latest prune upper bounds for given regions
+   *
+   * @param regions a set of regions
+   * @return a map containing region id and its latest prune upper bound 
value
+   * @throws IOException when not able to read the data from HBase
+   */
+  public Map<byte[], Long> getPruneUpperBoundForRegions(SortedSet<byte[]> 
regions) throws IOException {
+Map<byte[], Long> resultMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (regions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  resultMap.put(region, pruneUpperBoundRegion);
+}
+  }
+}
+  }
+  return resultMap;
+}
+  }
+
+  /**
+   * Delete all regions that are not in the given exclude set and whose 
prune upper bound is less than a given value
+   *
+   * @param deletionPruneUpperBound prune upper bound below which regions 
will be deleted
+   * @param excludeRegions set of regions that should not be deleted
+   * @throws IOException when not able to delete data in HBase
+   */
+  public void deleteRegionsWithPruneUpperBoundBefore(long 
deletionPruneUpperBound, SortedSet<byte[]> excludeRegions)
+throws IOException {
+try (Table stateTable = stateTableSupplier.get()) {
+  byte[] startRow = makeRegionKey(EMPTY_BYTE_ARRAY);
+  Scan scan = new Scan(startRow, REGION_KEY_PREFIX_STOP);
+  scan.addColumn(FAMILY, PRUNE_UPPER_BOUND_COL);
+
+  try (ResultScanner scanner = stateTable.getScanner(scan)) {
+Result next;
+while ((next = scanner.next()) != null) {
+  byte[] region = getRegionFromKey(next.getRow());
+  if (!excludeRegions.contains(region)) {
+byte[] timeBytes = next.getValue(FAMILY, 
PRUNE_UPPER_BOUND_COL);
+if (timeBytes != null) {
+  long pruneUpperBoundRegion = Bytes.toLong(timeBytes);
+  if (pruneUpperBoundRegion < deletionPruneUpperBound) {
+stateTable.delete(new Delete(next.getRow()));
+  }
+}
+  }
+}
+  }
+}
+  }
+
+  // ---
+  // --- Methods for regions at a given time ---
+  // Key: 0x2
+  // Col 't': 
+  // ---
+
+  /**
+   * Persist the regions for a given time
--- End diff --

what exactly does this persist? Simply what regions existed at that time?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra issue #19: TEPHRA-35 Save compaction state for pruning inva...

2016-11-05 Thread anew
Github user anew commented on the issue:

https://github.com/apache/incubator-tephra/pull/19
  
Question: Suppose I start a transaction, which times out, and therefore 
goes into the invalid list. A little later HBase performs a major compaction. 
This transaction and all its writes are removed from the table by the 
DataJanitor. A little later TxManager prunes its invalid transactions, and 
because this tx has been removed from HBase, it removes it from the invalid 
list. 

The problem is if the program that started the transaction is still 
running. What if it performs another write after the transaction pruning? This 
would be an invalid version, but now it has been pruned from the invalid list 
and becomes visible. 

Isn't that a problem?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-tephra pull request #19: TEPHRA-35 Save compaction state for pruni...

2016-11-04 Thread anew
Github user anew commented on a diff in the pull request:

https://github.com/apache/incubator-tephra/pull/19#discussion_r86656252
  
--- Diff: tephra-core/src/main/java/org/apache/tephra/util/TxUtils.java ---
@@ -149,4 +149,15 @@ private static long getMaxTTL(Map<byte[], Long> 
ttlByFamily) {
   public static boolean isPreExistingVersion(long version) {
 return version < MAX_NON_TX_TIMESTAMP;
   }
+
+  /**
+   * Returns the maximum transaction that can be removed from the invalid 
list for the state represented by the given
+   * transaction.
+   */
+  public static long getPruneUpperBound(Transaction tx) {
+long maxInvalidTx =
+  tx.getInvalids().length > 0 ? 
tx.getInvalids()[tx.getInvalids().length - 1] : Transaction.NO_TX_IN_PROGRESS;
+long firstInProgress = tx.getFirstInProgress();
+return Math.min(maxInvalidTx, firstInProgress - 1);
--- End diff --

if there is no invalid tx, and also no in-progress, the upper bound would 
still be the current tx id? That is, return tx.getTransactionId()-1? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >