http://www.mediawiki.org/wiki/Special:Code/MediaWiki/70004

Revision: 70004
Author:   daniel
Date:     2010-07-27 11:43:13 +0000 (Tue, 27 Jul 2010)

Log Message:
-----------
progress indicator, periodic flushing, optional re-loading

Modified Paths:
--------------
    trunk/WikiWord/CatGraph/src/main/java/de/wikimedia/catgraph/CatGraph.java

Modified: 
trunk/WikiWord/CatGraph/src/main/java/de/wikimedia/catgraph/CatGraph.java
===================================================================
--- trunk/WikiWord/CatGraph/src/main/java/de/wikimedia/catgraph/CatGraph.java   
2010-07-27 11:29:19 UTC (rev 70003)
+++ trunk/WikiWord/CatGraph/src/main/java/de/wikimedia/catgraph/CatGraph.java   
2010-07-27 11:43:13 UTC (rev 70004)
@@ -9,7 +9,6 @@
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -33,6 +32,7 @@
 import de.brightbyte.data.cursor.DataCursor;
 import de.brightbyte.db.DatabaseAccess;
 import de.brightbyte.db.DatabaseUtil;
+import de.brightbyte.db.DatabaseDataSet.Cursor;
 import de.brightbyte.io.ChunkingCursor;
 import de.brightbyte.io.LineCursor;
 import de.brightbyte.io.Output;
@@ -43,6 +43,70 @@
 import de.brightbyte.util.SystemUtils;
 
 public class CatGraph extends ConsoleApp {
+       public class ListElementPairCursor implements
+                       DataCursor<Pair<Integer, Integer>> {
+
+               private DataCursor<? extends List<?>> cursor;
+               private int aField;
+               private int bField;
+
+               public ListElementPairCursor(DataCursor<? extends List<?>> 
cursor, int aField, int bField) {
+                       this.cursor = cursor;
+                       this.aField = aField;
+                       this.bField = bField;
+               }
+
+               public void close() {
+                       cursor.close();
+               }
+
+               public Pair<Integer, Integer> next() throws 
PersistenceException {
+                       List<?> row = cursor.next();
+                       if ( row == null ) return null;
+                       
+                       int a =  DatabaseUtil.asInt( row.get(aField) );
+                       int b =  DatabaseUtil.asInt( row.get(bField) );
+                       
+                       return new Pair<Integer, Integer>(a, b);
+               }
+
+       }
+
+       public class ResultSetPairCursor implements
+                       DataCursor<Pair<Integer, Integer>> {
+
+               private ResultSet cursor;
+               private int aField;
+               private int bField;
+
+               public ResultSetPairCursor(ResultSet cursor, int aField, int 
bField) {
+                       this.cursor = cursor;
+                       this.aField = aField;
+                       this.bField = bField;
+               }
+
+               public void close() {
+                       try {
+                               cursor.close();
+                       } catch (SQLException e) {
+                               //ignore silently
+                       }
+               }
+
+               public Pair<Integer, Integer> next() throws 
PersistenceException {
+                       try {
+                               if (!cursor.next()) return null;
+                               
+                               int a = DatabaseUtil.asInt( 
cursor.getObject(aField) );
+                               int b = DatabaseUtil.asInt( 
cursor.getObject(bField) );
+
+                               return new Pair<Integer, Integer>(  a, b );
+                       } catch (SQLException e) {
+                               throw new PersistenceException();
+                       }
+               }
+       }
+
        protected class Descendants implements Command {
 
                private int start;
@@ -63,56 +127,71 @@
 
        private GraphDatabaseService graphDb;
        private IndexService indexer;
+       private long chunkSize = 100000;
        
        public CatGraph(GraphDatabaseService graphDb, IndexService indexer) {
                this.graphDb = graphDb;
                this.indexer = indexer;
        }
 
-       public void loadArcs(DatabaseAccess db, String sql, int fromCol, int 
toCol) throws SQLException {
-               ResultSet rs = db.executeQuery("load graph", sql);
-               while (rs.next()) {
-                       int from = rs.getInt(fromCol);
-                       int to = rs.getInt(toCol);
-                       
-                       putArc(from ,to);
+       public void loadArcs(DatabaseAccess db, String sql, int fromCol, int 
toCol) throws PersistenceException {
+               try {
+                       ResultSet rs = db.executeQuery("load graph", sql);
+                       loadArcs( new ResultSetPairCursor(rs, fromCol, toCol) );
+               } catch (SQLException e) {
+                       throw new PersistenceException(e);
                }
        }
 
        public void loadArcs(DataCursor<? extends List<?>> args, int fromCol, 
int toCol) throws PersistenceException {
-               ChunkedProgressRateTracker progressTracker = new 
ChunkedProgressRateTracker("arcs");
-               
-               List<?> row ;
-               while ((row = args.next()) != null) {
-                       int from = DatabaseUtil.asInt( row.get(fromCol) );
-                       int to = DatabaseUtil.asInt( row.get(toCol) );
-                       
-                       putArc(from ,to);
-                       
-                       progressTracker.step();
-                       if ( progressTracker.chunkIf(10000, 10) ) {
-                               out.println(progressTracker);
-                       }
-               }
+               loadArcs( new ListElementPairCursor(args, fromCol, toCol) );
        }
 
        public void loadArcs(DataCursor<Pair<Integer, Integer>> args) throws 
PersistenceException {
                ChunkedProgressRateTracker progressTracker = new 
ChunkedProgressRateTracker("arcs");
                
-               Pair<Integer, Integer> row ;
-               while ((row = args.next()) != null) {
-                       int from = row.getA();
-                       int to = row.getB();
+               Transaction tx = null;
+               boolean done = false;
+               try {
+                       Pair<Integer, Integer> row ;
+                       while ((row = args.next()) != null) {
+                               int from = row.getA();
+                               int to = row.getB();
+                               
+                               if (tx==null) tx = graphDb.beginTx();
+                               
+                               putArc(from ,to);
+                               
+                               progressTracker.step();
+                               //out.println("adding "+from+" -> "+to+" 
(#"+progressTracker.getCurrentChunkSize()+")");
+                               
+                               if ( progressTracker.chunkIf(chunkSize , 10) ) {
+                                       if (tx!=null) {
+                                               long t = 
System.currentTimeMillis();
+                                               out.println("committing...");
+                                               tx.success();
+                                               tx.finish();
+                                               tx = null;
+                                               out.println("commit took 
"+(System.currentTimeMillis() - t)+"ms.");
+                                       }
+                                       
+                                       out.println(progressTracker);
+                               }
+                       }
                        
-                       putArc(from ,to);
-                       
-                       progressTracker.step();
-                       if ( progressTracker.chunkIf(10000, 10) ) {
-                               out.println(progressTracker);
+                       done = true;
+               } finally {
+                       if ( tx != null) {
+                               if (done) tx.success();
+                               else tx.failure();
+                               
+                               tx.finish();
                        }
                }
+               
        }
 
+       /*
        public void loadRoots(DatabaseAccess db, String sql) throws 
SQLException {
                ResultSet rs = db.executeQuery("load graph", sql);
                while (rs.next()) {
@@ -137,6 +216,19 @@
                else return n;
        }
 
+       public Relationship putRoot(int root) {
+               return putRoot( aquireNodeByPageId(root) );
+       }
+
+       public Relationship putRoot(Node root) {
+               Node ref = graphDb.getReferenceNode();
+               if (ref.getId() == root.getId()) return null;
+               
+               Relationship relationship = ref.createRelationshipTo( root, 
CategoryRelationships.CONTAINS );
+               return relationship;
+       }
+*/
+       
        public Node getNodeByPageId(int pageId) {
                return indexer.getSingleNode("page_id", pageId);
        }
@@ -158,24 +250,12 @@
                return putArc( aquireNodeByPageId(from), 
aquireNodeByPageId(cat) );
        }
 
-       public Relationship putRoot(int root) {
-               return putRoot( aquireNodeByPageId(root) );
-       }
-
        public Relationship putArc(Node from, Node cat) {
                if ( from.getId() == cat.getId() ) return null;
                Relationship relationship = cat.createRelationshipTo( from, 
CategoryRelationships.CONTAINS );
                return relationship;
        }
 
-       public Relationship putRoot(Node root) {
-               Node ref = graphDb.getReferenceNode();
-               if (ref.getId() == root.getId()) return null;
-               
-               Relationship relationship = ref.createRelationshipTo( root, 
CategoryRelationships.CONTAINS );
-               return relationship;
-       }
-
        public Collection<Integer> getDescendants(int start) {
                Node n = getNodeByPageId(start);
                if ( n == null ) throw new IllegalArgumentException("page_id 
"+start+" not found");
@@ -244,40 +324,38 @@
                        configuration = CollectionUtils.asMap( 
SystemUtils.loadProperties(u, null) );
                }
                
-               GraphDatabaseService graphDb = new EmbeddedGraphDatabase( 
args.getParameter(0), configuration );
-               File tsv = new File(args.getParameter(1));
+               DataCursor<List<String>> cursor = null; 
+               GraphDatabaseService graphDb = null;
+               IndexService indexer = null;
 
-               IndexService indexer = new LuceneIndexService(graphDb); 
-
-               /*
-               DatabaseAccess db = new DatabaseSchema(null, dbInfo, null);
-               db.open();
-               
-               db.executeUpdate("", "use "+database+";");
-               */
-               CatGraph graph = new CatGraph(graphDb, indexer);
-               
-               InputStreamReader rd = new InputStreamReader(new 
FileInputStream(tsv));
-               ChunkingCursor cursor =  new ChunkingCursor(new LineCursor(rd), 
CsvLineChunker.tsv);
-               
-               cursor.next(); //skip header in first line
-               
-               Transaction tx = graphDb.beginTx();
-               try
-               {
-                       System.out.println("loading arcs....");
-                       long t = System.currentTimeMillis();
-                       graph.loadArcs(cursor, 0, 1);
-                       System.out.println("loading arcs took 
"+(System.currentTimeMillis() - t)+"ms.");
-               
+               try {
+                       graphDb = new EmbeddedGraphDatabase( 
args.getParameter(0), configuration );
+                       indexer = new LuceneIndexService(graphDb); 
+       
+                       CatGraph graph = new CatGraph(graphDb, indexer);
+                       
+                       if (args.getParameterCount()>1) {
+                               File tsv = new File(args.getParameter(1));
+                               InputStreamReader rd = new 
InputStreamReader(new FileInputStream(tsv));
+                               cursor =  new ChunkingCursor(new 
LineCursor(rd), CsvLineChunker.tsv);
+                       
+                               cursor.next(); //skip header in first line
+       
+                               System.out.println("loading arcs....");
+                               long t = System.currentTimeMillis();
+                               graph.loadArcs(cursor, 0, 1);
+                               System.out.println("loading arcs took 
"+(System.currentTimeMillis() - t)+"ms.");
+                       }
+                       
                        graph.run();
+                       
+                       System.out.println( "done" ); 
                }
                finally
                {
-                  tx.finish();
-                  graphDb.shutdown();
+                       if (indexer!=null) indexer.shutdown();
+                  if (graphDb!=null) graphDb.shutdown();
+                  System.exit(0);
                }
-               
-               System.out.println( "done" ); 
        }
 }



_______________________________________________
MediaWiki-CVS mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-cvs

Reply via email to