I'm not familiar with the Solr API but provided that ' SolrIndexerDriver' is a singleton, I guess that what's going on when running on a cluster is that the call to:
SolrIndexerDriver.solrInputDocumentList.add(elem) is happening on different singleton instances of the SolrIndexerDriver on different JVMs while SolrIndexerDriver.solrServer.commit is happening on the driver. In practical terms, the lists on the executors are being filled-in but they are never committed and on the driver the opposite is happening. -kr, Gerard On Mon, May 4, 2015 at 3:34 PM, Emre Sevinc <emre.sev...@gmail.com> wrote: > I'm trying to deal with some code that runs differently on Spark > stand-alone mode and Spark running on a cluster. Basically, for each item > in an RDD, I'm trying to add it to a list, and once this is done, I want to > send this list to Solr. > > This works perfectly fine when I run the following code in stand-alone > mode of Spark, but does not work when the same code is run on a cluster. > When I run the same code on a cluster, it is like "send to Solr" part of > the code is executed before the list to be sent to Solr is filled with > items. I try to force the execution by solrInputDocumentJavaRDD.collect(); > after foreach, but it seems like it does not have any effect. > > // For each RDD > solrInputDocumentJavaDStream.foreachRDD( > new Function<JavaRDD<SolrInputDocument>, Void>() { > @Override > public Void call(JavaRDD<SolrInputDocument> > solrInputDocumentJavaRDD) throws Exception { > > // For each item in a single RDD > solrInputDocumentJavaRDD.foreach( > new VoidFunction<SolrInputDocument>() { > @Override > public void call(SolrInputDocument > solrInputDocument) { > > // Add the solrInputDocument to the list of > SolrInputDocuments > > SolrIndexerDriver.solrInputDocumentList.add(solrInputDocument); > } > }); > > // Try to force execution > solrInputDocumentJavaRDD.collect(); > > > // After having finished adding every SolrInputDocument to the > list > // add it to the solrServer, and commit, waiting for the > commit to be flushed > try { > > // Seems like when run in cluster mode, the list size is > zero, > // therefore the following part is never executed > > if (SolrIndexerDriver.solrInputDocumentList != null > && SolrIndexerDriver.solrInputDocumentList.size() > > 0) { > > SolrIndexerDriver.solrServer.add(SolrIndexerDriver.solrInputDocumentList); > SolrIndexerDriver.solrServer.commit(true, true); > SolrIndexerDriver.solrInputDocumentList.clear(); > } > } catch (SolrServerException | IOException e) { > e.printStackTrace(); > } > > > return null; > } > } > ); > > > What should I do, so that sending-to-Solr part executes after the list of > SolrDocuments are added to solrInputDocumentList (and works also in cluster > mode)? > > > -- > Emre Sevinç >