Author: chetanm
Date: Sat Apr 11 06:57:36 2015
New Revision: 1672840

URL: http://svn.apache.org/r1672840
Log:
OAK-2590 - IndexCopier Error occurred while removing deleted files from Local
OAK-2649 - IndexCopier might create empty files in case of error occuring while 
copying
OAK-2709 - Misleading log message from IndexCopier

Merging 1666102,1666381,1667696,1671489,1671795,1672055

Modified:
    jackrabbit/oak/branches/1.0/   (props changed)
    
jackrabbit/oak/branches/1.0/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java
    
jackrabbit/oak/branches/1.0/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
    
jackrabbit/oak/branches/1.0/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java

Propchange: jackrabbit/oak/branches/1.0/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Apr 11 06:57:36 2015
@@ -1,2 +1,2 @@
-/jackrabbit/oak/trunk:1584578,1584602,1584614,1584616,1584709,1584781,1584937,1585297,1585304-1585305,1585420,1585424,1585427,1585448,1585465,1585468,1585486,1585497,1585509,1585647,1585655-1585656,1585661,1585665-1585666,1585669-1585670,1585673,1585680,1585719,1585763,1585770,1585896,1585904,1585907,1585940,1585949,1585951,1585956,1585962-1585963,1586287,1586320,1586364,1586372,1586655,1586836,1587130,1587224,1587399,1587408,1587472,1587485,1587488,1587538,1587580,1587807,1588033,1588042,1588046,1588066,1588201,1589025,1589101,1589137,1589141,1589263,1589440,1589442,1589484,1589488,1589661,1589664,1589682,1589708,1589741,1589748,1589789,1589794,1589850,1589864,1590628,1590660,1590684,1590697,1590701,1590980,1590988,1591101,1591226,1591229,1591293,1591314,1591317,1591362,1591374,1591381,1591438,1591467,1591552,1591704,1591713,1591715,1591723,1591874,1592487,1592512,1592658,1592665,1592677,1592742,1592744,1592787,1592809,1592955,1593036,1593048,1593061,1593133,1593210-1593211,1593231
 
,1593245,1593250,1593294,1593304,1593317,1593342,1593554,1594158-1594164,1594166-1594167,1594169,1594237,1594800,1594808,1594835,1594888,1595147,1595457,1595856,1596241,1596474,1596534,1596844,1597569,1597795,1597854,1597860,1598292,1598302,1598352,1598369,1598595,1598631,1598696,1598732,1598797-1598798,1599299,1599332,1599416,1599434,1599671,1600088,1600935,1601309,1601388,1601578,1601649,1601676,1601757,1601768,1601814,1601833,1601838,1601853,1601878,1601888,1601922,1602156,1602170,1602174,1602179,1602183,1602201,1602207,1602227,1602256,1602261,1602342,1602796-1602797,1602800,1602809,1602853,1602872,1602914,1603155,1603307,1603401,1603441,1603748,1604166,1605030,1605036,1605038,1605292,1605447,1605526,1605670,1605725,1605831,1605852,1606077,1606079,1606087,1606638,1606641,1606644,1606708,1606711,1607031-1607032,1607077,1607127,1607141,1607152,1607185,1607196,1607331,1607362,1607366,1607392,1607526,1607557,1607664,1607737,1608560,1608731,1608783,1609064,1609081,1609165,1609214,1609
 
488,1610489,1610592,1610603,1610634,1610658,1610664,1611021,1611041,1611270,1611275,1611277,1611313,1611332,1611584,1612560,1612825,1612848,1612993,1613018,1613041,1614265,1614272,1614344-1614345,1614384-1614385,1614397,1614405-1614406,1614574,1614591,1614593,1614596,1614604,1614689,1614807,1614835,1614891,1615417-1615418,1616182,1616236,1616463,1616719,1617417,1617451,1617463,1617711,1618158,1618613,1618624,1618709,1619222,1619411,1619695,1619800,1619808,1619815,1619823-1619824,1620512,1620581,1620585,1620634,1620898,1620905,1621115,1621123-1621124,1621168,1621192,1621201,1621706,1621962,1622197,1622201,1622207,1622250,1622479,1623364,1623766,1623827,1623949,1623969,1623973,1624216,1624317,1624551,1624559,1624973,1624993-1624994,1625025,1625036,1625158,1625224,1625237,1625299,1625348,1625620,1625916,1625962-1625963,1626021,1626053,1626163,1626168,1626175,1626191,1626265,1626770,1627047,1627052,1627228,1627346,1627470,1627473,1627479,1627503,1627586,1627590,1627715,1627731,1628180,1
 
628198,1628262,1628447,1628608,1629688,1629840,1629858,1629917,1630055-1630057,1630156,1630299,1630338,1630773,1631283-1631284,1631333-1631334,1631617-1631619,1631630,1631699,1631704,1631711,1631967-1631969,1631986,1631990,1631999,1632002-1632003,1632017,1632258,1632264,1632270,1632293,1632303,1632592,1632605,1633315,1633389,1633559-1633560,1633562,1633567,1633571,1633598,1633608,1633641,1633687,1633697,1633768,1633783,1634505,1634513,1634774,1634779,1634781,1634792,1634803,1634814,1634816,1634838,1634841,1634852,1634864,1634896,1634898,1635044-1635045,1635060,1635077,1635089,1635102,1635108,1635218,1635387,1635435,1635518,1635563,1635586,1636336,1636348,1636505,1636585,1636799,1637368,1637382,1637413,1637651,1637815,1638779-1638783,1639260,1639577,1639622,1639963,1639966,1639973,1640134,1640143,1640555-1640556,1640694-1640695,1640715,1640722-1640723,1640728,1640863-1640872,1641340,1641350,1641352,1641541,1641596-1641599,1641601,1641662,1641671,1641695,1641771,1641802,1641811,164195
 
0,1642031,1642056,1642119,1642285,1642648,1642667,1642954,1642959,1643111,1643178,1643186,1643204,1643287,1643767,1643774,1643982,1644016,1644106,1644366,1644383,1644397-1644398,1644407,1644479,1644547,1644552,1644554,1644588,1644645,1644650,1644654,1644689,1644750,1645421,1645424,1645459,1645585,1645611,1645637,1645646,1645660-1645663,1645888,1645901,1645948,1645966,1645970-1645971,1646014,1646164,1646174,1646469,1646684,1646726-1646728,1646766,1646795,1646981,1649743,1649803,1650015,1650239,1650529,1650797,1651323,1651382,1651643,1651652,1651730,1651988-1651989,1651996,1652024,1652035,1652058-1652059,1652075,1652127,1652158,1652467,1652965,1652971,1652992,1653207,1653446,1653463,1653484,1653572,1653579,1653591,1653804,1653809,1653813,1653848-1653850,1653882,1654116,1654174,1654743,1654756,1654778,1655028,1655049,1655054-1655055,1655086,1655237,1655248,1655996,1656019,1656027,1656033,1656303,1656394,1656400,1656425,1656427,1656453,1656628,1656678,1657163,1657188,1657766,1657804,165
 
8470,1658977,1659285,1659483,1659527,1659550,1659578,1659765,1660154-1660155,1660383,1660409,1660426,1660676,1660870,1660872,1660897,1660903,1661069,1661122,1661146,1661158,1661226,1661630,1661643,1661645,1662313-1662315,1662323,1662381,1662450,1662456,1663241,1663275,1663288,1663448,1663526,1663528,1663565,1663578,1663666,1663705,1663730,1663753,1663854,1664038,1664184,1664228-1664229,1664231,1664381,1664569,1664947,1664987,1665184,1665271-1665272,1665274-1665275,1665436,1665634,1665758,1665835,1665892,1665897,1665910,1665918,1666177,1666218,1666220,1666351-1666352,1666384,1666426,1666491,1667062,1667184,1667293,1667462,1667498,1667502,1667573,1667590,1667782,1668160,1668275,1668641,1668645,1668649,1668665,1668671,1668683,1668688,1668845,1669072,1669096,1669337,1669361,1669579,1669680,1670030,1670693,1670705,1671512,1671787,1672277,1672350,1672468,1672537,1672603,1672834-1672835
+/jackrabbit/oak/trunk:1584578,1584602,1584614,1584616,1584709,1584781,1584937,1585297,1585304-1585305,1585420,1585424,1585427,1585448,1585465,1585468,1585486,1585497,1585509,1585647,1585655-1585656,1585661,1585665-1585666,1585669-1585670,1585673,1585680,1585719,1585763,1585770,1585896,1585904,1585907,1585940,1585949,1585951,1585956,1585962-1585963,1586287,1586320,1586364,1586372,1586655,1586836,1587130,1587224,1587399,1587408,1587472,1587485,1587488,1587538,1587580,1587807,1588033,1588042,1588046,1588066,1588201,1589025,1589101,1589137,1589141,1589263,1589440,1589442,1589484,1589488,1589661,1589664,1589682,1589708,1589741,1589748,1589789,1589794,1589850,1589864,1590628,1590660,1590684,1590697,1590701,1590980,1590988,1591101,1591226,1591229,1591293,1591314,1591317,1591362,1591374,1591381,1591438,1591467,1591552,1591704,1591713,1591715,1591723,1591874,1592487,1592512,1592658,1592665,1592677,1592742,1592744,1592787,1592809,1592955,1593036,1593048,1593061,1593133,1593210-1593211,1593231
 
,1593245,1593250,1593294,1593304,1593317,1593342,1593554,1594158-1594164,1594166-1594167,1594169,1594237,1594800,1594808,1594835,1594888,1595147,1595457,1595856,1596241,1596474,1596534,1596844,1597569,1597795,1597854,1597860,1598292,1598302,1598352,1598369,1598595,1598631,1598696,1598732,1598797-1598798,1599299,1599332,1599416,1599434,1599671,1600088,1600935,1601309,1601388,1601578,1601649,1601676,1601757,1601768,1601814,1601833,1601838,1601853,1601878,1601888,1601922,1602156,1602170,1602174,1602179,1602183,1602201,1602207,1602227,1602256,1602261,1602342,1602796-1602797,1602800,1602809,1602853,1602872,1602914,1603155,1603307,1603401,1603441,1603748,1604166,1605030,1605036,1605038,1605292,1605447,1605526,1605670,1605725,1605831,1605852,1606077,1606079,1606087,1606638,1606641,1606644,1606708,1606711,1607031-1607032,1607077,1607127,1607141,1607152,1607185,1607196,1607331,1607362,1607366,1607392,1607526,1607557,1607664,1607737,1608560,1608731,1608783,1609064,1609081,1609165,1609214,1609
 
488,1610489,1610592,1610603,1610634,1610658,1610664,1611021,1611041,1611270,1611275,1611277,1611313,1611332,1611584,1612560,1612825,1612848,1612993,1613018,1613041,1614265,1614272,1614344-1614345,1614384-1614385,1614397,1614405-1614406,1614574,1614591,1614593,1614596,1614604,1614689,1614807,1614835,1614891,1615417-1615418,1616182,1616236,1616463,1616719,1617417,1617451,1617463,1617711,1618158,1618613,1618624,1618709,1619222,1619411,1619695,1619800,1619808,1619815,1619823-1619824,1620512,1620581,1620585,1620634,1620898,1620905,1621115,1621123-1621124,1621168,1621192,1621201,1621706,1621962,1622197,1622201,1622207,1622250,1622479,1623364,1623766,1623827,1623949,1623969,1623973,1624216,1624317,1624551,1624559,1624973,1624993-1624994,1625025,1625036,1625158,1625224,1625237,1625299,1625348,1625620,1625916,1625962-1625963,1626021,1626053,1626163,1626168,1626175,1626191,1626265,1626770,1627047,1627052,1627228,1627346,1627470,1627473,1627479,1627503,1627586,1627590,1627715,1627731,1628180,1
 
628198,1628262,1628447,1628608,1629688,1629840,1629858,1629917,1630055-1630057,1630156,1630299,1630338,1630773,1631283-1631284,1631333-1631334,1631617-1631619,1631630,1631699,1631704,1631711,1631967-1631969,1631986,1631990,1631999,1632002-1632003,1632017,1632258,1632264,1632270,1632293,1632303,1632592,1632605,1633315,1633389,1633559-1633560,1633562,1633567,1633571,1633598,1633608,1633641,1633687,1633697,1633768,1633783,1634505,1634513,1634774,1634779,1634781,1634792,1634803,1634814,1634816,1634838,1634841,1634852,1634864,1634896,1634898,1635044-1635045,1635060,1635077,1635089,1635102,1635108,1635218,1635387,1635435,1635518,1635563,1635586,1636336,1636348,1636505,1636585,1636799,1637368,1637382,1637413,1637651,1637815,1638779-1638783,1639260,1639577,1639622,1639963,1639966,1639973,1640134,1640143,1640555-1640556,1640694-1640695,1640715,1640722-1640723,1640728,1640863-1640872,1641340,1641350,1641352,1641541,1641596-1641599,1641601,1641662,1641671,1641695,1641771,1641802,1641811,164195
 
0,1642031,1642056,1642119,1642285,1642648,1642667,1642954,1642959,1643111,1643178,1643186,1643204,1643287,1643767,1643774,1643982,1644016,1644106,1644366,1644383,1644397-1644398,1644407,1644479,1644547,1644552,1644554,1644588,1644645,1644650,1644654,1644689,1644750,1645421,1645424,1645459,1645585,1645611,1645637,1645646,1645660-1645663,1645888,1645901,1645948,1645966,1645970-1645971,1646014,1646164,1646174,1646469,1646684,1646726-1646728,1646766,1646795,1646981,1649743,1649803,1650015,1650239,1650529,1650797,1651323,1651382,1651643,1651652,1651730,1651988-1651989,1651996,1652024,1652035,1652058-1652059,1652075,1652127,1652158,1652467,1652965,1652971,1652992,1653207,1653446,1653463,1653484,1653572,1653579,1653591,1653804,1653809,1653813,1653848-1653850,1653882,1654116,1654174,1654743,1654756,1654778,1655028,1655049,1655054-1655055,1655086,1655237,1655248,1655996,1656019,1656027,1656033,1656303,1656394,1656400,1656425,1656427,1656453,1656628,1656678,1657163,1657188,1657766,1657804,165
 
8470,1658977,1659285,1659483,1659527,1659550,1659578,1659765,1660154-1660155,1660383,1660409,1660426,1660676,1660870,1660872,1660897,1660903,1661069,1661122,1661146,1661158,1661226,1661630,1661643,1661645,1662313-1662315,1662323,1662381,1662450,1662456,1663241,1663275,1663288,1663448,1663526,1663528,1663565,1663578,1663666,1663705,1663730,1663753,1663854,1664038,1664184,1664228-1664229,1664231,1664381,1664569,1664947,1664987,1665184,1665271-1665272,1665274-1665275,1665436,1665634,1665758,1665835,1665892,1665897,1665910,1665918,1666102,1666177,1666218,1666220,1666351-1666352,1666381,1666384,1666426,1666491,1667062,1667184,1667293,1667462,1667498,1667502,1667573,1667590,1667696,1667782,1668160,1668275,1668641,1668645,1668649,1668665,1668671,1668683,1668688,1668845,1669072,1669096,1669337,1669361,1669579,1669680,1670030,1670693,1670705,1671489,1671512,1671787,1671795,1672055,1672277,1672350,1672468,1672537,1672603,1672834-1672835
 /jackrabbit/trunk:1345480

Modified: 
jackrabbit/oak/branches/1.0/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java?rev=1672840&r1=1672839&r2=1672840&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.0/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/CopyOnReadStatsMBean.java
 Sat Apr 11 06:57:36 2015
@@ -31,9 +31,29 @@ public interface CopyOnReadStatsMBean {
 
     int getRemoteReadCount();
 
+    int getScheduledForCopyCount();
+
+    int getCopyInProgressCount();
+
+    int getMaxCopyInProgressCount();
+
+    int getMaxScheduledForCopyCount();
+
+    String getCopyInProgressSize();
+
+    String[] getCopyInProgressDetails();
+
     String getDownloadSize();
 
     long getDownloadTime();
 
     String getLocalIndexSize();
+
+    String[] getGarbageDetails();
+
+    String getGarbageSize();
+
+    int getDeletedFilesCount();
+
+    String getGarbageCollectedSize();
 }

Modified: 
jackrabbit/oak/branches/1.0/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java?rev=1672840&r1=1672839&r2=1672840&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.0/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-lucene/src/main/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopier.java
 Sat Apr 11 06:57:36 2015
@@ -22,10 +22,14 @@ package org.apache.jackrabbit.oak.plugin
 import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -39,10 +43,12 @@ import javax.management.openmbean.Tabula
 import javax.management.openmbean.TabularType;
 
 import com.google.common.base.Charsets;
+import com.google.common.base.Function;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.hash.Hashing;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.jackrabbit.oak.commons.IOUtils;
 import org.apache.lucene.store.BaseDirectory;
@@ -56,10 +62,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.collect.Iterables.toArray;
+import static com.google.common.collect.Iterables.transform;
 import static com.google.common.collect.Maps.newConcurrentMap;
 
 class IndexCopier implements CopyOnReadStatsMBean {
     private static final Set<String> REMOTE_ONLY = 
ImmutableSet.of("segments.gen");
+    private static final int MAX_FAILURE_ENTRIES = 10000;
 
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final Executor executor;
@@ -68,11 +77,21 @@ class IndexCopier implements CopyOnReadS
     private final AtomicInteger localReadCount = new AtomicInteger();
     private final AtomicInteger remoteReadCount = new AtomicInteger();
     private final AtomicInteger invalidFileCount = new AtomicInteger();
+    private final AtomicInteger deletedFileCount = new AtomicInteger();
+    private final AtomicInteger scheduledForCopyCount = new AtomicInteger();
+    private final AtomicInteger copyInProgressCount = new AtomicInteger();
+    private final AtomicInteger maxCopyInProgressCount = new AtomicInteger();
+    private final AtomicInteger maxScheduledForCopyCount = new AtomicInteger();
+    private final AtomicLong copyInProgressSize = new AtomicLong();
     private final AtomicLong downloadSize = new AtomicLong();
+    private final AtomicLong garbageCollectedSize = new AtomicLong();
     private final AtomicLong downloadTime = new AtomicLong();
 
+
     private final Map<String, String> indexPathMapping = 
Maps.newConcurrentMap();
     private final Map<String, String> indexPathVersionMapping = 
Maps.newConcurrentMap();
+    private final ConcurrentMap<String, LocalIndexFile> failedToDeleteFiles = 
Maps.newConcurrentMap();
+    private final Set<LocalIndexFile> copyInProgressFiles = 
Collections.newSetFromMap(new ConcurrentHashMap<LocalIndexFile, Boolean>());
 
     public IndexCopier(Executor executor, File indexRootDir) {
         this.executor = executor;
@@ -106,6 +125,36 @@ class IndexCopier implements CopyOnReadS
         return new File(indexRootDir, subDir);
     }
 
+    Map<String, LocalIndexFile> getFailedToDeleteFiles() {
+        return Collections.unmodifiableMap(failedToDeleteFiles);
+    }
+
+    private void failedToDelete(LocalIndexFile file){
+        //Limit the size on best effort basis
+        if (failedToDeleteFiles.size() < MAX_FAILURE_ENTRIES) {
+            LocalIndexFile failedToDeleteFile = 
failedToDeleteFiles.putIfAbsent(file.getKey(), file);
+            if (failedToDeleteFile == null){
+                failedToDeleteFile = file;
+            }
+            failedToDeleteFile.incrementAttemptToDelete();
+        } else {
+            log.warn("Not able to delete {}. Currently more than {} file with 
total size {} are pending delete.",
+                    file.deleteLog(), failedToDeleteFiles.size(), 
getGarbageSize());
+        }
+    }
+
+    private void successfullyDeleted(LocalIndexFile file, boolean fileExisted){
+        LocalIndexFile failedToDeleteFile = 
failedToDeleteFiles.remove(file.getKey());
+        if (failedToDeleteFile != null){
+            log.debug("Deleted : {}", failedToDeleteFile.deleteLog());
+        }
+
+        if (fileExisted){
+            garbageCollectedSize.addAndGet(file.size);
+            deletedFileCount.incrementAndGet();
+        }
+    }
+
     /**
      * Directory implementation which lazily copies the index files from a
      * remote directory in background.
@@ -182,17 +231,25 @@ class IndexCopier implements CopyOnReadS
         }
 
         private void copy(final FileReference reference) {
+            updateMaxScheduled(scheduledForCopyCount.incrementAndGet());
             executor.execute(new Runnable() {
                 @Override
                 public void run() {
                     String name = reference.name;
+                    boolean success = false;
+                    boolean copyAttempted = false;
                     try {
+                        scheduledForCopyCount.decrementAndGet();
                         if (!local.fileExists(name)) {
-                            long start = System.currentTimeMillis();
+                            long fileSize = remote.fileLength(name);
+                            LocalIndexFile file = new LocalIndexFile(local, 
name, fileSize);
+                            long start = startCopy(file);
+                            copyAttempted = true;
+
                             remote.copy(local, name, name, IOContext.READ);
                             reference.markValid();
-                            downloadTime.addAndGet(System.currentTimeMillis() 
- start);
-                            downloadSize.addAndGet(remote.fileLength(name));
+
+                            doneCopy(file, start);
                         } else {
                             long localLength = local.fileLength(name);
                             long remoteLength = remote.fileLength(name);
@@ -208,11 +265,22 @@ class IndexCopier implements CopyOnReadS
                                 reference.markValid();
                             }
                         }
+                        success = true;
                     } catch (IOException e) {
                         //TODO In case of exception there would not be any 
other attempt
                         //to download the file. Look into support for retry
                         log.warn("Error occurred while copying file [{}] " +
                                 "from {} to {}", name, remote, local, e);
+                    } finally {
+                        if (copyAttempted && !success){
+                            try {
+                                if (local.fileExists(name)) {
+                                    local.deleteFile(name);
+                                }
+                            } catch (IOException e) {
+                                log.warn("Error occurred while deleting 
corrupted file [{}] from [{}]", name, local, e);
+                            }
+                        }
                     }
                 }
             });
@@ -270,10 +338,27 @@ class IndexCopier implements CopyOnReadS
                     ImmutableSet.copyOf(remote.listAll())
             );
 
-            for (String fileName : filesToBeDeleted){
-                local.deleteFile(fileName);
+            Set<String> failedToDelete = Sets.newHashSet();
+
+            for (String fileName : filesToBeDeleted) {
+                LocalIndexFile file = new LocalIndexFile(local, fileName);
+                try {
+                    boolean fileExisted = false;
+                    if (local.fileExists(fileName)) {
+                        fileExisted = true;
+                        local.deleteFile(fileName);
+                    }
+                    successfullyDeleted(file, fileExisted);
+                } catch (IOException e) {
+                    failedToDelete.add(fileName);
+                    failedToDelete(file);
+                    log.debug("Error occurred while removing deleted file {} 
from Local {}. " +
+                            "Attempt would be maid to delete it on next run ", 
fileName, local, e);
+                }
             }
 
+            filesToBeDeleted = new HashSet<String>(filesToBeDeleted);
+            filesToBeDeleted.removeAll(failedToDelete);
             if(!filesToBeDeleted.isEmpty()) {
                 log.debug("Following files have been removed from Lucene " +
                         "index directory [{}]", filesToBeDeleted);
@@ -303,6 +388,40 @@ class IndexCopier implements CopyOnReadS
         }
     }
 
+    private long startCopy(LocalIndexFile file) {
+        updateMaxInProgress(copyInProgressCount.incrementAndGet());
+        copyInProgressSize.addAndGet(file.size);
+        copyInProgressFiles.add(file);
+        return System.currentTimeMillis();
+    }
+
+    private void doneCopy(LocalIndexFile file, long start) {
+        copyInProgressFiles.remove(file);
+        copyInProgressCount.decrementAndGet();
+        copyInProgressSize.addAndGet(-file.size);
+
+        downloadTime.addAndGet(System.currentTimeMillis() - start);
+        downloadSize.addAndGet(file.size);
+    }
+
+    private void updateMaxScheduled(int val) {
+        synchronized (maxScheduledForCopyCount){
+            int current = maxScheduledForCopyCount.get();
+            if (val > current){
+                maxScheduledForCopyCount.set(val);
+            }
+        }
+    }
+
+    private void updateMaxInProgress(int val) {
+        synchronized (maxCopyInProgressCount){
+            int current = maxCopyInProgressCount.get();
+            if (val > current){
+                maxCopyInProgressCount.set(val);
+            }
+        }
+    }
+
     private class DeleteOldDirOnClose extends FilterDirectory {
         private final File oldIndexDir;
 
@@ -322,6 +441,107 @@ class IndexCopier implements CopyOnReadS
             super.close();
         }
     }
+    
+    static final class LocalIndexFile {
+        final File dir;
+        final String name;
+        final long size;
+        private volatile int deleteAttemptCount;
+        final long creationTime = System.currentTimeMillis();
+        
+        public LocalIndexFile(Directory dir, String fileName, long size){
+            this.dir = getFSDir(dir);
+            this.name = fileName;
+            this.size = size;
+        }
+
+        public LocalIndexFile(Directory dir, String fileName){
+            this(dir, fileName, getFileLength(dir, fileName));
+        }
+
+        public String getKey(){
+            if (dir != null){
+                return new File(dir, name).getAbsolutePath();
+            }
+            return name;
+        }
+
+        public void incrementAttemptToDelete(){
+            deleteAttemptCount++;
+        }
+
+        public int getDeleteAttemptCount() {
+            return deleteAttemptCount;
+        }
+
+        public String deleteLog(){
+            return String.format("%s (%s, %d attempts, %d s)", name,
+                    IOUtils.humanReadableByteCount(size), deleteAttemptCount, 
timeTaken());
+        }
+
+        public String copyLog(){
+            return String.format("%s (%s, %1.1f%%, %s, %d s)", name,
+                    IOUtils.humanReadableByteCount(actualSize()),
+                    copyProgress(),
+                    IOUtils.humanReadableByteCount(size), timeTaken());
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            LocalIndexFile localIndexFile = (LocalIndexFile) o;
+
+            if (dir != null ? !dir.equals(localIndexFile.dir) : 
localIndexFile.dir != null)
+                return false;
+            return name.equals(localIndexFile.name);
+
+        }
+
+        @Override
+        public int hashCode() {
+            int result = dir != null ? dir.hashCode() : 0;
+            result = 31 * result + name.hashCode();
+            return result;
+        }
+
+        private long timeTaken(){
+            return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() 
- creationTime);
+        }
+
+        private float copyProgress(){
+            return actualSize() * 1.0f / size * 100;
+        }
+
+        private long actualSize(){
+            return dir != null ? new File(dir, name).length() : 0;
+        }
+    }
+
+    static File getFSDir(Directory dir) {
+        if (dir instanceof FilterDirectory){
+            dir = ((FilterDirectory) dir).getDelegate();
+        }
+
+        if (dir instanceof FSDirectory){
+            return ((FSDirectory) dir).getDirectory();
+        }
+
+        return null;
+    }
+
+    /**
+     * Get the file length in best effort basis.
+     * @return actual fileLength. -1 if cannot determine
+     */
+    private static long getFileLength(Directory dir, String fileName){
+        try{
+            return dir.fileLength(fileName);
+        } catch (Exception e){
+            return -1;
+        }
+    }
 
     //~------------------------------------------< CopyOnReadStatsMBean >
 
@@ -333,9 +553,10 @@ class IndexCopier implements CopyOnReadS
                     "Lucene Index Stats", IndexMappingData.TYPE, new 
String[]{"jcrPath"});
             tds = new TabularDataSupport(tt);
             for (Map.Entry<String, String> e : indexPathMapping.entrySet()){
+                String size = 
IOUtils.humanReadableByteCount(FileUtils.sizeOfDirectory(new 
File(e.getValue())));
                 tds.put(new CompositeDataSupport(IndexMappingData.TYPE,
                         IndexMappingData.FIELD_NAMES,
-                        new String[] {e.getKey(), e.getValue()}));
+                        new String[]{e.getKey(), e.getValue(), size}));
             }
         } catch (OpenDataException e){
             throw new IllegalStateException(e);
@@ -372,20 +593,89 @@ class IndexCopier implements CopyOnReadS
         return 
IOUtils.humanReadableByteCount(FileUtils.sizeOfDirectory(indexRootDir));
     }
 
+    @Override
+    public String[] getGarbageDetails() {
+        return toArray(transform(failedToDeleteFiles.values(),
+                new Function<LocalIndexFile, String>() {
+                    @Override
+                    public String apply(LocalIndexFile input) {
+                        return input.deleteLog();
+                    }
+                }), String.class);
+    }
+
+    @Override
+    public String getGarbageSize() {
+        long garbageSize = 0;
+        for (LocalIndexFile failedToDeleteFile : failedToDeleteFiles.values()){
+            garbageSize += failedToDeleteFile.size;
+        }
+        return IOUtils.humanReadableByteCount(garbageSize);
+    }
+
+    @Override
+    public int getScheduledForCopyCount() {
+        return scheduledForCopyCount.get();
+    }
+
+    @Override
+    public int getCopyInProgressCount() {
+        return copyInProgressCount.get();
+    }
+
+    @Override
+    public String getCopyInProgressSize() {
+        return IOUtils.humanReadableByteCount(copyInProgressSize.get());
+    }
+
+    @Override
+    public int getMaxCopyInProgressCount() {
+        return maxCopyInProgressCount.get();
+    }
+
+    @Override
+    public int getMaxScheduledForCopyCount() {
+        return maxScheduledForCopyCount.get();
+    }
+
+    @Override
+    public String[] getCopyInProgressDetails() {
+        return toArray(transform(copyInProgressFiles,
+                new Function<LocalIndexFile, String>() {
+                    @Override
+                    public String apply(LocalIndexFile input) {
+                        return input.copyLog();
+                    }
+                }), String.class);
+    }
+
+    @Override
+    public int getDeletedFilesCount() {
+        return deletedFileCount.get();
+    }
+
+    @Override
+    public String getGarbageCollectedSize() {
+        return IOUtils.humanReadableByteCount(garbageCollectedSize.get());
+    }
+
     private static class IndexMappingData {
         static final String[] FIELD_NAMES = new String[]{
                 "jcrPath",
                 "fsPath",
+                "size",
         };
 
         static final String[] FIELD_DESCRIPTIONS = new String[]{
                 "JCR Path",
                 "Filesystem Path",
+                "Size",
         };
 
         static final OpenType[] FIELD_TYPES = new OpenType[]{
                 SimpleType.STRING,
                 SimpleType.STRING,
+                SimpleType.STRING,
         };
 
         static final CompositeType TYPE = createCompositeType();

Modified: 
jackrabbit/oak/branches/1.0/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
URL: 
http://svn.apache.org/viewvc/jackrabbit/oak/branches/1.0/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java?rev=1672840&r1=1672839&r2=1672840&view=diff
==============================================================================
--- 
jackrabbit/oak/branches/1.0/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
 (original)
+++ 
jackrabbit/oak/branches/1.0/oak-lucene/src/test/java/org/apache/jackrabbit/oak/plugins/index/lucene/IndexCopierTest.java
 Sat Apr 11 06:57:36 2015
@@ -22,11 +22,24 @@ package org.apache.jackrabbit.oak.plugin
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
-import org.apache.commons.io.FileUtils;
+import javax.management.openmbean.TabularData;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ForwardingListeningExecutorService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.jackrabbit.oak.commons.IOUtils;
 import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
 import org.apache.jackrabbit.oak.spi.state.NodeState;
 import org.apache.lucene.store.Directory;
@@ -34,15 +47,19 @@ import org.apache.lucene.store.IOContext
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.store.RAMDirectory;
-import org.junit.After;
+import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import static com.google.common.collect.Lists.newArrayList;
 import static 
com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
+import static 
org.apache.jackrabbit.oak.plugins.index.IndexConstants.REINDEX_COUNT;
 import static 
org.apache.jackrabbit.oak.plugins.nodetype.write.InitialContent.INITIAL_CONTENT;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class IndexCopierTest {
     private Random rnd = new Random();
@@ -50,6 +67,9 @@ public class IndexCopierTest {
 
     private NodeState root = INITIAL_CONTENT;
 
+    @Rule
+    public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
     private NodeBuilder builder = root.builder();
 
     @Test
@@ -79,6 +99,73 @@ public class IndexCopierTest {
     }
 
     @Test
+    public void basicTestWithFS() throws Exception{
+        IndexDefinition defn = new IndexDefinition(root, 
builder.getNodeState());
+        IndexCopier c1 = new IndexCopier(sameThreadExecutor(), getWorkDir());
+
+        Directory remote = new RAMDirectory();
+        Directory wrapped = c1.wrap("/foo" , defn, remote);
+
+        byte[] t1 = writeFile(remote, "t1");
+        byte[] t2 = writeFile(remote , "t2");
+
+        assertEquals(2, wrapped.listAll().length);
+
+        assertTrue(wrapped.fileExists("t1"));
+        assertTrue(wrapped.fileExists("t2"));
+
+        assertEquals(t1.length, wrapped.fileLength("t1"));
+        assertEquals(t2.length, wrapped.fileLength("t2"));
+
+        readAndAssert(wrapped, "t1", t1);
+
+        //t1 should now be added to testDir
+        File indexBaseDir = c1.getIndexDir("/foo");
+        File indexDir = new File(indexBaseDir, "0");
+        assertTrue(new File(indexDir, "t1").exists());
+
+        TabularData td = c1.getIndexPathMapping();
+        assertEquals(1, td.size());
+    }
+
+    @Ignore("OAK-2722") //FIXME test fails on windows
+    @Test
+    public void deleteOldPostReindex() throws Exception{
+        IndexDefinition defn = new IndexDefinition(root, 
builder.getNodeState());
+        IndexCopier c1 = new IndexCopier(sameThreadExecutor(), getWorkDir());
+
+        Directory remote = new CloseSafeDir();
+        Directory w1 = c1.wrap("/foo" , defn, remote);
+
+        byte[] t1 = writeFile(remote , "t1");
+        byte[] t2 = writeFile(remote , "t2");
+
+        readAndAssert(w1, "t1", t1);
+        readAndAssert(w1, "t2", t2);
+
+        //t1 should now be added to testDir
+        File indexBaseDir = c1.getIndexDir("/foo");
+        File indexDir = new File(indexBaseDir, "0");
+        assertTrue(new File(indexDir, "t1").exists());
+
+        builder.setProperty(REINDEX_COUNT, 1);
+        defn = new IndexDefinition(root, builder.getNodeState());
+
+        //Close old version
+        w1.close();
+        //Get a new one with updated reindexCount
+        Directory w2 = c1.wrap("/foo" , defn, remote);
+
+        readAndAssert(w2, "t1", t1);
+
+        w2.close();
+        assertFalse("Old index directory should have been removed", 
indexDir.exists());
+
+        File indexDir2 = new File(indexBaseDir, "1");
+        assertTrue(new File(indexDir2, "t1").exists());
+    }
+
+    @Test
     public void concurrentRead() throws Exception{
         Directory baseDir = new RAMDirectory();
         IndexDefinition defn = new IndexDefinition(root, 
builder.getNodeState());
@@ -87,18 +174,20 @@ public class IndexCopierTest {
         IndexCopier c1 = new RAMIndexCopier(baseDir, executor, getWorkDir());
 
         TestRAMDirectory remote = new TestRAMDirectory();
-        Directory wrapped = c1.wrap("/foo" , defn, remote);
+        Directory wrapped = c1.wrap("/foo", defn, remote);
 
         byte[] t1 = writeFile(remote , "t1");
 
         //1. Trigger a read which should go to remote
         readAndAssert(wrapped, "t1", t1);
+        assertEquals(1, c1.getScheduledForCopyCount());
         assertEquals(1, remote.openedFiles.size());
         assertEquals(1, executor.commands.size());
 
         //2. Trigger another read and this should also be
         //served from remote
         readAndAssert(wrapped, "t1", t1);
+        assertEquals(1, c1.getScheduledForCopyCount());
         assertEquals(2, remote.openedFiles.size());
         //Second read should not add a new copy task
         assertEquals(1, executor.commands.size());
@@ -111,6 +200,74 @@ public class IndexCopierTest {
         readAndAssert(wrapped, "t1", t1);
         // Now read should be served from local and not from remote
         assertEquals(0, remote.openedFiles.size());
+        assertEquals(0, c1.getScheduledForCopyCount());
+    }
+
+    @Test
+    public void copyInProgressStats() throws Exception{
+        Directory baseDir = new RAMDirectory();
+        IndexDefinition defn = new IndexDefinition(root, 
builder.getNodeState());
+
+        final List<ListenableFuture<?>> submittedTasks = Lists.newArrayList();
+        ExecutorService executor = new ForwardingListeningExecutorService() {
+            @Override
+            protected ListeningExecutorService delegate() {
+                return 
MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
+            }
+
+            @Override
+            public void execute(Runnable command) {
+                submittedTasks.add(super.submit(command));
+            }
+        };
+
+        IndexCopier c1 = new RAMIndexCopier(baseDir, executor, getWorkDir());
+
+        final CountDownLatch copyProceed = new CountDownLatch(1);
+        final CountDownLatch copyRequestArrived = new CountDownLatch(1);
+        TestRAMDirectory remote = new TestRAMDirectory(){
+            @Override
+            public void copy(Directory to, String src, String dest, IOContext 
context) throws IOException {
+                copyRequestArrived.countDown();
+                try {
+                    copyProceed.await();
+                } catch (InterruptedException e) {
+
+                }
+                super.copy(to, src, dest, context);
+            }
+        };
+        Directory wrapped = c1.wrap("/foo", defn, remote);
+
+        byte[] t1 = writeFile(remote , "t1");
+
+        //1. Trigger a read which should go to remote
+        readAndAssert(wrapped, "t1", t1);
+        copyRequestArrived.await();
+        assertEquals(1, c1.getCopyInProgressCount());
+        assertEquals(1, remote.openedFiles.size());
+
+        //2. Trigger another read and this should also be
+        //served from remote
+        readAndAssert(wrapped, "t1", t1);
+        assertEquals(1, c1.getCopyInProgressCount());
+        assertEquals(IOUtils.humanReadableByteCount(t1.length), 
c1.getCopyInProgressSize());
+        assertEquals(1, c1.getCopyInProgressDetails().length);
+        System.out.println(Arrays.toString(c1.getCopyInProgressDetails()));
+        assertEquals(2, remote.openedFiles.size());
+
+        //3. Perform copy
+        copyProceed.countDown();
+        Futures.allAsList(submittedTasks).get();
+        remote.reset();
+
+        //4. Now read again after copy is done
+        readAndAssert(wrapped, "t1", t1);
+        // Now read should be served from local and not from remote
+        assertEquals(0, remote.openedFiles.size());
+        assertEquals(0, c1.getCopyInProgressCount());
+
+        executor.shutdown();
     }
 
     /**
@@ -126,14 +283,14 @@ public class IndexCopierTest {
         TestRAMDirectory remote = new TestRAMDirectory();
         Directory wrapped = c1.wrap("/foo" , defn, remote);
 
-        byte[] t1 = writeFile(remote , "t1");
+        byte[] t1 = writeFile(remote, "t1");
 
         //1. Read for the first time should be served from remote
         readAndAssert(wrapped, "t1", t1);
         assertEquals(1, remote.openedFiles.size());
 
         //2. Reuse the testDir and read again
-        Directory wrapped2 = c1.wrap("/foo" , defn, remote);
+        Directory wrapped2 = c1.wrap("/foo", defn, remote);
         remote.reset();
 
         //3. Now read should be served from local
@@ -153,6 +310,34 @@ public class IndexCopierTest {
     }
 
     @Test
+    public void deleteCorruptedFile() throws Exception{
+        Directory baseDir = new RAMDirectory();
+        IndexDefinition defn = new IndexDefinition(root, 
builder.getNodeState());
+        RAMIndexCopier c1 = new RAMIndexCopier(baseDir, sameThreadExecutor(), 
getWorkDir());
+
+        Directory remote = new RAMDirectory(){
+            @Override
+            public IndexInput openInput(String name, IOContext context) throws 
IOException {
+                throw new IllegalStateException("boom");
+            }
+        };
+
+        String fileName = "failed.txt";
+        Directory wrapped = c1.wrap("/foo" , defn, remote);
+
+        byte[] t1 = writeFile(remote , fileName);
+
+        try {
+            readAndAssert(wrapped, fileName, t1);
+            fail("Read of file should have failed");
+        } catch (IllegalStateException ignore){
+
+        }
+
+        assertFalse(c1.baseDir.fileExists(fileName));
+    }
+
+    @Test
     public void deletesOnClose() throws Exception{
         //Use a close safe dir. In actual case the FSDir would
         //be opened on same file system hence it can retain memory
@@ -190,9 +375,65 @@ public class IndexCopierTest {
         assertTrue(baseDir.fileExists("t2"));
     }
 
-    @After
-    public void close() throws IOException {
-        FileUtils.deleteQuietly(getWorkDir());
+
+    @Test
+    public void failureInDelete() throws Exception{
+        final Set<String> testFiles = new HashSet<String>();
+        Directory baseDir = new CloseSafeDir() {
+            @Override
+            public void deleteFile(String name) throws IOException {
+                if (testFiles.contains(name)){
+                    throw new IOException("Not allowed to delete " + name);
+                }
+                super.deleteFile(name);
+            }
+        };
+
+        IndexDefinition defn = new IndexDefinition(root, 
builder.getNodeState());
+        IndexCopier c1 = new RAMIndexCopier(baseDir, sameThreadExecutor(), 
getWorkDir());
+
+        Directory r1 = new RAMDirectory();
+
+        byte[] t1 = writeFile(r1, "t1");
+        byte[] t2 = writeFile(r1 , "t2");
+
+        Directory w1 = c1.wrap("/foo" , defn, r1);
+        readAndAssert(w1, "t1", t1);
+        readAndAssert(w1, "t2", t2);
+
+        // t1 and t2 should now be present in local (base dir which back local)
+        assertTrue(baseDir.fileExists("t1"));
+        assertTrue(baseDir.fileExists("t2"));
+
+        Directory r2 = new CloseSafeDir();
+        copy(r1, r2);
+        r2.deleteFile("t1");
+
+        Directory w2 = c1.wrap("/foo" , defn, r2);
+
+        //Close would trigger removal of file which are not present in remote
+        testFiles.add("t1");
+        w2.close();
+
+        assertEquals(1, c1.getFailedToDeleteFiles().size());
+        IndexCopier.LocalIndexFile testFile = 
c1.getFailedToDeleteFiles().values().iterator().next();
+
+        assertEquals(1, testFile.getDeleteAttemptCount());
+        assertEquals(IOUtils.humanReadableByteCount(t1.length), 
c1.getGarbageSize());
+        assertEquals(1, c1.getGarbageDetails().length);
+
+        Directory w3 = c1.wrap("/foo" , defn, r2);
+        w3.close();
+        assertEquals(2, testFile.getDeleteAttemptCount());
+
+        //Now let the file to be deleted
+        testFiles.clear();
+
+        Directory w4 = c1.wrap("/foo" , defn, r2);
+        w4.close();
+
+        //No pending deletes left
+        assertEquals(0, c1.getFailedToDeleteFiles().size());
     }
 
     private byte[] writeFile(Directory dir, String name) throws IOException {
@@ -210,7 +451,7 @@ public class IndexCopierTest {
     }
 
     private File getWorkDir(){
-        return new File("target", "IndexClonerTest");
+        return temporaryFolder.getRoot();
     }
 
     private static void readAndAssert(Directory wrapped, String fileName, 
byte[] expectedData) throws IOException {


Reply via email to