http://www.mediawiki.org/wiki/Special:Code/MediaWiki/70004
Revision: 70004
Author: daniel
Date: 2010-07-27 11:43:13 +0000 (Tue, 27 Jul 2010)
Log Message:
-----------
progress indicator, periodic flushing, optional re-loading
Modified Paths:
--------------
trunk/WikiWord/CatGraph/src/main/java/de/wikimedia/catgraph/CatGraph.java
Modified:
trunk/WikiWord/CatGraph/src/main/java/de/wikimedia/catgraph/CatGraph.java
===================================================================
--- trunk/WikiWord/CatGraph/src/main/java/de/wikimedia/catgraph/CatGraph.java
2010-07-27 11:29:19 UTC (rev 70003)
+++ trunk/WikiWord/CatGraph/src/main/java/de/wikimedia/catgraph/CatGraph.java
2010-07-27 11:43:13 UTC (rev 70004)
@@ -9,7 +9,6 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -33,6 +32,7 @@
import de.brightbyte.data.cursor.DataCursor;
import de.brightbyte.db.DatabaseAccess;
import de.brightbyte.db.DatabaseUtil;
+import de.brightbyte.db.DatabaseDataSet.Cursor;
import de.brightbyte.io.ChunkingCursor;
import de.brightbyte.io.LineCursor;
import de.brightbyte.io.Output;
@@ -43,6 +43,70 @@
import de.brightbyte.util.SystemUtils;
public class CatGraph extends ConsoleApp {
+ public class ListElementPairCursor implements
+ DataCursor<Pair<Integer, Integer>> {
+
+ private DataCursor<? extends List<?>> cursor;
+ private int aField;
+ private int bField;
+
+ public ListElementPairCursor(DataCursor<? extends List<?>>
cursor, int aField, int bField) {
+ this.cursor = cursor;
+ this.aField = aField;
+ this.bField = bField;
+ }
+
+ public void close() {
+ cursor.close();
+ }
+
+ public Pair<Integer, Integer> next() throws
PersistenceException {
+ List<?> row = cursor.next();
+ if ( row == null ) return null;
+
+ int a = DatabaseUtil.asInt( row.get(aField) );
+ int b = DatabaseUtil.asInt( row.get(bField) );
+
+ return new Pair<Integer, Integer>(a, b);
+ }
+
+ }
+
+ public class ResultSetPairCursor implements
+ DataCursor<Pair<Integer, Integer>> {
+
+ private ResultSet cursor;
+ private int aField;
+ private int bField;
+
+ public ResultSetPairCursor(ResultSet cursor, int aField, int
bField) {
+ this.cursor = cursor;
+ this.aField = aField;
+ this.bField = bField;
+ }
+
+ public void close() {
+ try {
+ cursor.close();
+ } catch (SQLException e) {
+ //ignore silently
+ }
+ }
+
+ public Pair<Integer, Integer> next() throws
PersistenceException {
+ try {
+ if (!cursor.next()) return null;
+
+ int a = DatabaseUtil.asInt(
cursor.getObject(aField) );
+ int b = DatabaseUtil.asInt(
cursor.getObject(bField) );
+
+ return new Pair<Integer, Integer>( a, b );
+ } catch (SQLException e) {
+ throw new PersistenceException();
+ }
+ }
+ }
+
protected class Descendants implements Command {
private int start;
@@ -63,56 +127,71 @@
private GraphDatabaseService graphDb;
private IndexService indexer;
+ private long chunkSize = 100000;
public CatGraph(GraphDatabaseService graphDb, IndexService indexer) {
this.graphDb = graphDb;
this.indexer = indexer;
}
- public void loadArcs(DatabaseAccess db, String sql, int fromCol, int
toCol) throws SQLException {
- ResultSet rs = db.executeQuery("load graph", sql);
- while (rs.next()) {
- int from = rs.getInt(fromCol);
- int to = rs.getInt(toCol);
-
- putArc(from ,to);
+ public void loadArcs(DatabaseAccess db, String sql, int fromCol, int
toCol) throws PersistenceException {
+ try {
+ ResultSet rs = db.executeQuery("load graph", sql);
+ loadArcs( new ResultSetPairCursor(rs, fromCol, toCol) );
+ } catch (SQLException e) {
+ throw new PersistenceException(e);
}
}
public void loadArcs(DataCursor<? extends List<?>> args, int fromCol,
int toCol) throws PersistenceException {
- ChunkedProgressRateTracker progressTracker = new
ChunkedProgressRateTracker("arcs");
-
- List<?> row ;
- while ((row = args.next()) != null) {
- int from = DatabaseUtil.asInt( row.get(fromCol) );
- int to = DatabaseUtil.asInt( row.get(toCol) );
-
- putArc(from ,to);
-
- progressTracker.step();
- if ( progressTracker.chunkIf(10000, 10) ) {
- out.println(progressTracker);
- }
- }
+ loadArcs( new ListElementPairCursor(args, fromCol, toCol) );
}
public void loadArcs(DataCursor<Pair<Integer, Integer>> args) throws
PersistenceException {
ChunkedProgressRateTracker progressTracker = new
ChunkedProgressRateTracker("arcs");
- Pair<Integer, Integer> row ;
- while ((row = args.next()) != null) {
- int from = row.getA();
- int to = row.getB();
+ Transaction tx = null;
+ boolean done = false;
+ try {
+ Pair<Integer, Integer> row ;
+ while ((row = args.next()) != null) {
+ int from = row.getA();
+ int to = row.getB();
+
+ if (tx==null) tx = graphDb.beginTx();
+
+ putArc(from ,to);
+
+ progressTracker.step();
+ //out.println("adding "+from+" -> "+to+"
(#"+progressTracker.getCurrentChunkSize()+")");
+
+ if ( progressTracker.chunkIf(chunkSize , 10) ) {
+ if (tx!=null) {
+ long t =
System.currentTimeMillis();
+ out.println("committing...");
+ tx.success();
+ tx.finish();
+ tx = null;
+ out.println("commit took
"+(System.currentTimeMillis() - t)+"ms.");
+ }
+
+ out.println(progressTracker);
+ }
+ }
- putArc(from ,to);
-
- progressTracker.step();
- if ( progressTracker.chunkIf(10000, 10) ) {
- out.println(progressTracker);
+ done = true;
+ } finally {
+ if ( tx != null) {
+ if (done) tx.success();
+ else tx.failure();
+
+ tx.finish();
}
}
+
}
+ /*
public void loadRoots(DatabaseAccess db, String sql) throws
SQLException {
ResultSet rs = db.executeQuery("load graph", sql);
while (rs.next()) {
@@ -137,6 +216,19 @@
else return n;
}
+ public Relationship putRoot(int root) {
+ return putRoot( aquireNodeByPageId(root) );
+ }
+
+ public Relationship putRoot(Node root) {
+ Node ref = graphDb.getReferenceNode();
+ if (ref.getId() == root.getId()) return null;
+
+ Relationship relationship = ref.createRelationshipTo( root,
CategoryRelationships.CONTAINS );
+ return relationship;
+ }
+*/
+
public Node getNodeByPageId(int pageId) {
return indexer.getSingleNode("page_id", pageId);
}
@@ -158,24 +250,12 @@
return putArc( aquireNodeByPageId(from),
aquireNodeByPageId(cat) );
}
- public Relationship putRoot(int root) {
- return putRoot( aquireNodeByPageId(root) );
- }
-
public Relationship putArc(Node from, Node cat) {
if ( from.getId() == cat.getId() ) return null;
Relationship relationship = cat.createRelationshipTo( from,
CategoryRelationships.CONTAINS );
return relationship;
}
- public Relationship putRoot(Node root) {
- Node ref = graphDb.getReferenceNode();
- if (ref.getId() == root.getId()) return null;
-
- Relationship relationship = ref.createRelationshipTo( root,
CategoryRelationships.CONTAINS );
- return relationship;
- }
-
public Collection<Integer> getDescendants(int start) {
Node n = getNodeByPageId(start);
if ( n == null ) throw new IllegalArgumentException("page_id
"+start+" not found");
@@ -244,40 +324,38 @@
configuration = CollectionUtils.asMap(
SystemUtils.loadProperties(u, null) );
}
- GraphDatabaseService graphDb = new EmbeddedGraphDatabase(
args.getParameter(0), configuration );
- File tsv = new File(args.getParameter(1));
+ DataCursor<List<String>> cursor = null;
+ GraphDatabaseService graphDb = null;
+ IndexService indexer = null;
- IndexService indexer = new LuceneIndexService(graphDb);
-
- /*
- DatabaseAccess db = new DatabaseSchema(null, dbInfo, null);
- db.open();
-
- db.executeUpdate("", "use "+database+";");
- */
- CatGraph graph = new CatGraph(graphDb, indexer);
-
- InputStreamReader rd = new InputStreamReader(new
FileInputStream(tsv));
- ChunkingCursor cursor = new ChunkingCursor(new LineCursor(rd),
CsvLineChunker.tsv);
-
- cursor.next(); //skip header in first line
-
- Transaction tx = graphDb.beginTx();
- try
- {
- System.out.println("loading arcs....");
- long t = System.currentTimeMillis();
- graph.loadArcs(cursor, 0, 1);
- System.out.println("loading arcs took
"+(System.currentTimeMillis() - t)+"ms.");
-
+ try {
+ graphDb = new EmbeddedGraphDatabase(
args.getParameter(0), configuration );
+ indexer = new LuceneIndexService(graphDb);
+
+ CatGraph graph = new CatGraph(graphDb, indexer);
+
+ if (args.getParameterCount()>1) {
+ File tsv = new File(args.getParameter(1));
+ InputStreamReader rd = new
InputStreamReader(new FileInputStream(tsv));
+ cursor = new ChunkingCursor(new
LineCursor(rd), CsvLineChunker.tsv);
+
+ cursor.next(); //skip header in first line
+
+ System.out.println("loading arcs....");
+ long t = System.currentTimeMillis();
+ graph.loadArcs(cursor, 0, 1);
+ System.out.println("loading arcs took
"+(System.currentTimeMillis() - t)+"ms.");
+ }
+
graph.run();
+
+ System.out.println( "done" );
}
finally
{
- tx.finish();
- graphDb.shutdown();
+ if (indexer!=null) indexer.shutdown();
+ if (graphDb!=null) graphDb.shutdown();
+ System.exit(0);
}
-
- System.out.println( "done" );
}
}
_______________________________________________
MediaWiki-CVS mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-cvs