Author: rande
Date: 2010-02-17 19:03:27 +0100 (Wed, 17 Feb 2010)
New Revision: 28094

Modified:
   
plugins/sfSolrPlugin/branches/sf1.2/lib/indexer/sfLuceneDoctrineIndexerHandler.class.php
   plugins/sfSolrPlugin/branches/sf1.2/lib/sfLucene.class.php
   
plugins/sfSolrPlugin/branches/sf1.2/lib/task/sfLuceneUpdateModelSystemTask.class.php
   
plugins/sfSolrPlugin/branches/sf1.2/lib/task/sfLuceneUpdateModelTask.class.php
   plugins/sfSolrPlugin/branches/sf1.2/lib/util/sfLuceneService.class.php
Log:
[sfSolrPlugin] refactor indexing task

Modified: 
plugins/sfSolrPlugin/branches/sf1.2/lib/indexer/sfLuceneDoctrineIndexerHandler.class.php
===================================================================
--- 
plugins/sfSolrPlugin/branches/sf1.2/lib/indexer/sfLuceneDoctrineIndexerHandler.class.php
    2010-02-17 16:42:53 UTC (rev 28093)
+++ 
plugins/sfSolrPlugin/branches/sf1.2/lib/indexer/sfLuceneDoctrineIndexerHandler.class.php
    2010-02-17 18:03:27 UTC (rev 28094)
@@ -16,11 +16,13 @@
 
 class sfLuceneDoctrineIndexerHandler extends sfLuceneModelIndexerHandler
 {
-  public function rebuildModel($name, $offset = null, $limit = null)
+  public function rebuildModel($name, $start_page = null, $limit = null)
   {
+    
+    $options    = $this->getSearch()->getParameter('models')->get($name);
+    $start_page = $start_page === null ? 1 : $start_page;
+    $limit      = is_numeric($limit) ? $limit : $options->get('rebuild_limit');
 
-    $options = $this->getSearch()->getParameter('models')->get($name);
-
     if(!$options)
     {
       throw new LogicException('The model \''.$name.'\' does not have any 
configurations');
@@ -29,25 +31,29 @@
     $table = Doctrine :: getTable($name);
     $query = $this->getBaseQuery($name);
 
-    if(is_numeric($offset) && is_numeric($limit))
-    {
-      $this->_rebuild($query, $offset, $limit);
-      $query->free();
-      $query->from($table->getComponentName());
-    }
-    else
-    {
+    $count = $query->count();
+    
+    $totalPages = ceil($count / $limit);
 
-      $count = $query->count();
-      $per   = $options->get('rebuild_limit');
+    // try to reduce the limit usage on php 5.2
+    $memory_limit = sfLuceneService::convertBytes(ini_get('memory_limit'));
+    $internal_limit = 10485760 * 2  ; // 10Mo
 
-      $totalPages = ceil($count / $per);
-
-      for ($page = 0; $page < $totalPages; $page++)
-      {
-        $offset = $page * $per;
-        $this->_rebuild(clone $query, $offset, $per);
-      }
+    // fetch one object to load all relations
+    $consume_memory_query = clone $query;
+    $consume_memory_query->limit(1)->fetchOne();
+    
+    for ($page = $start_page; $page < $totalPages; $page++)
+    {
+      
+      $this->getSearch()->getEventDispatcher()->notifyUntil(new sfEvent($this, 
'lucene.indexing_loop', array(
+        'model' => $name,
+        'page'  => $page,
+        'limit' => $limit
+      )));
+    
+      $offset = $page * $limit;
+      $this->batchRebuild(clone $query, $offset, $limit);
     }
   }
 
@@ -74,15 +80,15 @@
     return $query->count();
   }
 
-  protected function _rebuild($query, $offset, $limit)
-  {
-
+  public function batchRebuild($query, $offset, $limit)
+  {    
     $collection = $query->limit($limit)->offset($offset)->execute();
 
     $documents = array();
     $pks = array();
     foreach($collection as $record)
     {
+      
       $doc = $this->getFactory()->getModel($record)->getDocument();
 
       if(!$doc)
@@ -139,7 +145,6 @@
       );
     }
 
-    
     unset($collection);
   }
 }
\ No newline at end of file

Modified: plugins/sfSolrPlugin/branches/sf1.2/lib/sfLucene.class.php
===================================================================
--- plugins/sfSolrPlugin/branches/sf1.2/lib/sfLucene.class.php  2010-02-17 
16:42:53 UTC (rev 28093)
+++ plugins/sfSolrPlugin/branches/sf1.2/lib/sfLucene.class.php  2010-02-17 
18:03:27 UTC (rev 28094)
@@ -25,11 +25,6 @@
   const VERSION = '0.2-DEV';
 
   /**
-   * Holds the internal dispatcher for this Lucene instance.
-   */
-  protected $dispatcher = null;
-
-  /**
    * Holds the search service instance
    */
   protected $search_service = null;
@@ -85,8 +80,6 @@
     $this->setParameter('culture', $culture);
     $this->setParameter('index_location', $name.'_'.$culture);
 
-    $this->dispatcher = new sfEventDispatcher;
-
     $this->configuration = $configuration;
     
     $this->initialize();
@@ -404,7 +397,7 @@
   public function getEventDispatcher()
   {
     
-    return $this->dispatcher;
+    return $this->configuration->getEventDispatcher();
   }
 
   /**
@@ -450,10 +443,8 @@
   /**
   * Update only the index for one model
   *
-  * if $offset and $limit are numeric then only the portion between
-  * the offset and the limit are updated
   */
-  public function rebuildIndexModel($model, $offset = null, $limit = null)
+  public function rebuildIndexModel($model, $page = 1, $limit = null)
   {
     $this->setBatchMode();
 
@@ -470,7 +461,7 @@
         continue;
       }
 
-      $handler->rebuildModel($model, $offset, $limit);
+      $handler->rebuildModel($model, $page, $limit);
     }
 
     $this->getEventDispatcher()->notify(new sfEvent($this, 'lucene.log', 
array('Index rebuilt.')));
@@ -509,9 +500,9 @@
   */
   public function setBatchMode()
   {
-    //$this->getLucene()->setMaxBufferedDocs(500);
-    //$this->getLucene()->setMaxMergeDocs(PHP_INT_MAX);
-    //$this->getLucene()->setMergeFactor(50);
+    //$this->getSearchService()->setMaxBufferedDocs(500);
+    //$this->getSearchService()->setMaxMergeDocs(PHP_INT_MAX);
+    //$this->getSearchService()->setMergeFactor(50);
 
     return $this;
   }
@@ -522,9 +513,9 @@
   */
   public function setInteractiveMode()
   {
-    //$this->getLucene()->setMaxBufferedDocs(10);
-    //$this->getLucene()->setMaxMergeDocs(PHP_INT_MAX);
-    //$this->getLucene()->setMergeFactor(10);
+    //$this->getSearchService()->setMaxBufferedDocs(10);
+    //$this->getSearchService()->setMaxMergeDocs(PHP_INT_MAX);
+    //$this->getSearchService()->setMergeFactor(10);
 
     return $this;
   }
@@ -540,7 +531,7 @@
 
     $this->getEventDispatcher()->notify(new sfEvent($this, 'lucene.log', 
array('Optimizing index...')));
 
-    $this->getLucene()->optimize();
+    $this->getSearchService()->optimize();
 
     $this->getEventDispatcher()->notify(new sfEvent($this, 'lucene.log', 
array('Index optimized.')));
 
@@ -587,7 +578,7 @@
 
     $this->getEventDispatcher()->notify(new sfEvent($this, 'lucene.log', 
array('Committing changes...')));
 
-    $this->getLucene()->commit();
+    $this->getSearchService()->commit();
 
     $this->getEventDispatcher()->notify(new sfEvent($this, 'lucene.log', 
array('Changes committed.')));
 

Modified: 
plugins/sfSolrPlugin/branches/sf1.2/lib/task/sfLuceneUpdateModelSystemTask.class.php
===================================================================
--- 
plugins/sfSolrPlugin/branches/sf1.2/lib/task/sfLuceneUpdateModelSystemTask.class.php
        2010-02-17 16:42:53 UTC (rev 28093)
+++ 
plugins/sfSolrPlugin/branches/sf1.2/lib/task/sfLuceneUpdateModelSystemTask.class.php
        2010-02-17 18:03:27 UTC (rev 28094)
@@ -87,28 +87,14 @@
 
     $models = $search->getParameter('models')->getAll();
 
-    $factory = new sfLuceneIndexerFactory($search);
-    $handler = null;
-    foreach($factory->getHandlers() as $handler)
-    {
-      if($handler instanceof sfLuceneModelIndexerHandler)
-      {
-        break;
-      }
-    }
 
-    if(!$handler instanceof sfLuceneModelIndexerHandler)
-    {
-      throw new LogicException('No sfLuceneModelIndexerHandler defined !');
-    }
-
     if($model)
     {
       if($delete)
       {
         $this->deleteModel($search, $model);
       }
-      $this->update($handler, $app, $index, $culture, $model, $limit);
+      $this->update($app, $index, $culture, $model, $limit);
     }
     else
     {
@@ -119,7 +105,7 @@
           $this->deleteModel($search, $model);
         }
         
-        $this->update($handler, $app, $index, $culture, $model, $limit);
+        $this->update($app, $index, $culture, $model, $limit);
       }
     }
     
@@ -138,20 +124,28 @@
     $lucene->getLucene()->commit();
   }
   
-  public function update($handler, $app, $index, $culture, $model, $limit)
+  public function getFilestatePath($model)
   {
-    $page      = 0;
-    $count     = $handler->getCount($model);
-    $num_pages = ceil($count / $limit);
-
+    
+    return sprintf(sfConfig::get('sf_data_dir').'/solr_index/update_%s.state', 
sfInflector::underscore($model));
+  }
+  
+  public function update($app, $index, $culture, $model, $limit)
+  {
+    
+    $file = $this->getFilestatePath($model);
+    if(is_file($file))
+    {
+      $this->getFilesystem()->remove($file);
+    }
+        
     do
     {
-      $offset = $page * $limit;
-      $final = $this->formatter->format('Updating model='.$model.', 
page='.$page.'/'.$num_pages, array('fg' => 'green', 'bold' => true));
+      $final = $this->formatter->format('Updating model='.$model, array('fg' 
=> 'green', 'bold' => true));
       
       $this->dispatcher->notify(new sfEvent($this, 'command.log', array('', 
$final)));
       
-      $command = sprintf('%s/symfony lucene:update-model %s %s %s %s 
--limit=%s --offset=%s',
+      $command = sprintf('php -d memory_limit=64M %s/symfony 
lucene:update-model %s %s %s %s --state=true',
         $this->configuration->getRootDir(),
         $app,
         $index,
@@ -161,8 +155,27 @@
         $offset
       );
 
-      $this->getFilesystem()->sh($command);
+      try
+      {
+        $return_code = $this->getFilesystem()->sh($command);
+        $this->logSection('lucene', 'end indexing model : '.$model);
 
-    } while((++$page < $num_pages ?  true : false));
+        return 0;
+      } 
+      catch(sfException $e)
+      {
+        if(preg_match("/Allowed memory size of ([0-9]*) bytes/", 
$e->getMessage()))
+        {
+          $this->logSection('lucene', '  memory limit reach, starting new 
subprocess');
+
+          continue;
+        }
+        else
+        {
+          throw $e;
+        }
+      }
+
+    } while(1);
   }
 }
\ No newline at end of file

Modified: 
plugins/sfSolrPlugin/branches/sf1.2/lib/task/sfLuceneUpdateModelTask.class.php
===================================================================
--- 
plugins/sfSolrPlugin/branches/sf1.2/lib/task/sfLuceneUpdateModelTask.class.php  
    2010-02-17 16:42:53 UTC (rev 28093)
+++ 
plugins/sfSolrPlugin/branches/sf1.2/lib/task/sfLuceneUpdateModelTask.class.php  
    2010-02-17 18:03:27 UTC (rev 28094)
@@ -20,7 +20,7 @@
 */
 
 class sfLuceneUpdateModelTask extends sfLuceneBaseTask
-{
+{    
   protected function configure()
   {
     $this->addArguments(array(
@@ -31,10 +31,11 @@
     ));
 
     $this->addOptions(array(
-      new sfCommandOption('env', null, sfCommandOption::PARAMETER_REQUIRED, 
'The environment', 'search'),
-      new sfCommandOption('offset', null, sfCommandOption::PARAMETER_REQUIRED, 
'The offset were the index should start', null),
-      new sfCommandOption('limit', null, sfCommandOption::PARAMETER_REQUIRED, 
'The number number max of record to index from the offset', null),
-      new sfCommandOption('delete', null, sfCommandOption::PARAMETER_OPTIONAL, 
'set to true to delete all related index', false),
+      new sfCommandOption('env', null, sfCommandOption::PARAMETER_OPTIONAL, 
'The environment', 'search'),
+      new sfCommandOption('state', null, sfCommandOption::PARAMETER_OPTIONAL, 
'If state is set to true then the task will save the state on memory limit 
exception', false),
+      new sfCommandOption('page', null, sfCommandOption::PARAMETER_OPTIONAL, 
'The page where the index should start', 1),
+      new sfCommandOption('limit', null, sfCommandOption::PARAMETER_OPTIONAL, 
'The number number max of record to index from the page', null),
+      new sfCommandOption('delete', null, sfCommandOption::PARAMETER_OPTIONAL, 
'set to true to delete all related index - page should', false),
     ));
 
     $this->aliases = array('lucene-update-model');
@@ -67,11 +68,11 @@
     $culture = $arguments['culture'];
     $model   = $arguments['model'];
     
+    $state   = $options['state'];
     $offset  = $options['offset'];
     $limit   = $options['limit'];
     $delete  = $options['delete'];
     
-
     $this->checkAppExists($app);
     $this->standardBootstrap($app, $options['env']);
 
@@ -81,8 +82,21 @@
       throw new LogicException('This feature is only implemented for Doctrine 
ORM');
     }
     
+    
+    if($state)
+    {
+      // use state file
+      // the state file only contains the last page used and the limit
+      $state = $this->getState($model);
+      $page  = $state['page'];
+      $limit = $state['limit'];
+      $this->logSection('lucene', sprintf('Loading state page:%s, limit:%s', 
$page, $limit));
+    }
+    
+    
$this->configuration->getEventDispatcher()->connect('lucene.indexing_loop', 
array($this, 'handleMemoryLimitEvent'));
+
     $instance = sfLucene::getInstance($index, $culture, $this->configuration);
-    
+        
     $this->setupEventDispatcher($instance);
     
     if($delete)
@@ -92,9 +106,63 @@
       $instance->getLucene()->commit();
     }
     
-    $this->rebuild($instance, $model, $offset, $limit);
+    $this->rebuild($instance, $model, $page, $limit);
+    
+    if($state)
+    {
+      $file = $this->getFilestatePath($model);
+      $this->getFilesystem()->remove($file);
+    }
+  }
+  
+  public function handleMemoryLimitEvent(sfEvent $event)
+  {
+    
+    // store the current state
+    $this->saveState($event['model'], array(
+      'limit' => $event['limit'],
+      'page'  => $event['page']
+    ));
+    
+    $event->setProcessed(true);
+  }
+  
+  public function getFilestatePath($model)
+  {
+    
+    return sprintf(sfConfig::get('sf_data_dir').'/solr_index/update_%s.state', 
sfInflector::underscore($model));
+  }
+  
+  public function getState($model)
+  {
+    
+    $file = $this->getFilestatePath($model);
+    
+    $state = false;
+    
+    if(is_file($file))
+    {
+      $state = unserialize(@file_get_contents($file));
 
+    }
+    
+    if(!is_array($state))
+    {
+      $state = array(
+        'page' => 1,
+        'limit' => null,
+      );
+    }
+    
+    return $state;
   }
+  
+  public function saveState($model, $state)
+  {
+    
+    $file = 
sprintf(sfConfig::get('sf_data_dir').'/solr_index/update_%s.state', $model);
+    file_put_contents($file, serialize($state));
+  }
 
   protected function rebuild($search, $model, $offset, $limit)
   {
@@ -103,8 +171,9 @@
     $this->dispatcher->notify(new sfEvent($this, 'command.log', 
array($this->formatter->format(sprintf('Processing "%s/%s" now...', 
$search->getParameter('name'), $search->getParameter('culture')), array('fg' => 
'red', 'bold' => true)))));
 
     $search->rebuildIndexModel($model, $offset, $limit);
+
+    $search->commit();
     $search->optimize();
-    $search->commit();
 
     $time = microtime(true) - $start;
 

Modified: plugins/sfSolrPlugin/branches/sf1.2/lib/util/sfLuceneService.class.php
===================================================================
--- plugins/sfSolrPlugin/branches/sf1.2/lib/util/sfLuceneService.class.php      
2010-02-17 16:42:53 UTC (rev 28093)
+++ plugins/sfSolrPlugin/branches/sf1.2/lib/util/sfLuceneService.class.php      
2010-02-17 18:03:27 UTC (rev 28094)
@@ -37,6 +37,41 @@
   }
   
   /**
+   *
+   * compute shorthand memory notation into int
+   *  from http://www.php.net/manual/en/faq.using.php#78405
+   * 
+   * @return int bytes value of the shorthand notation
+   */
+  public static function convertBytes($value) 
+  {
+    
+    if (is_numeric($value)) 
+    {
+      
+      return $value;
+    }
+
+    $value_length = strlen($value);
+    $qty  = substr( $value, 0, $value_length - 1 );
+    $unit = strtolower( substr( $value, $value_length - 1 ) );
+    
+    switch ( $unit ) {
+      case 'k':
+        $qty *= 1024;
+        break;
+      case 'm':
+        $qty *= 1048576;
+        break;
+      case 'g':
+        $qty *= 1073741824;
+        break;
+    }
+    
+    return $qty;
+
+  }
+  /**
    * Simple Search interface
    *
    * @param string $query The raw query string

-- 
You received this message because you are subscribed to the Google Groups 
"symfony SVN" group.
To post to this group, send email to [email protected].
To unsubscribe from this group, send email to 
[email protected].
For more options, visit this group at 
http://groups.google.com/group/symfony-svn?hl=en.

Reply via email to