Re: rya performance Problems

2018-02-13 Thread Jorge Machado
Thanks ! will try that out



> On 13 Feb 2018, at 15:10, Puja Valiyil  wrote:
> 
> Yes.  There is a config parameter on the AccumuloRDFConfiguration that
> specifies whether or not to flush after every insert.  If you set this to
> "false", then the ingest times should improve.  If set to true, the flush
> method on the Accumulo MultitableBatchWriter is called after every each
> triple is inserted.
> 
> 
> On Tue, Feb 13, 2018 at 9:04 AM, Jorge Machado  wrote:
> 
>> From Accumulo BatchWriters ?
>> 
>> Jorge Machado
>> www.jmachado.me
>> 
>> 
>> 
>> 
>> 
>>> On 13 Feb 2018, at 13:13, Puja Valiyil  wrote:
>>> 
>>> Hey Jorge,
>>> There is a config value for flushing on insert— make sure that is set to
>> false.  If it is set to true, The writer to accumulo will flush after every
>> triple which would slow performance down.
>>> Hope this helps!
>>> Thanks,
>>> Puja
>>> 
>>> Sent from my iPhone
>>> 
 On Feb 13, 2018, at 3:43 AM, Jorge Machado  wrote:
 
 
 
 Hi Guys,
 
 I just give it a test drive to rya over a spark job on aws with 5
>> accumulo instances.
 the performance is really really slow getting only 2000 records per
>> second.
 each commit to accumulo takes rought 20ms
 
 Any kind of  trick here or did I miss something ?
 
 Here is my code (I changed the code to run on aws of course):
> package template.spark
> 
> import java.io.File
> 
> import org.apache.accumulo.core.client.{ClientConfiguration,
>> Connector, ZooKeeperInstance}
> import org.apache.accumulo.core.client.security.tokens.PasswordToken
> import org.apache.rya.accumulo.{AccumuloRdfConfiguration,
>> AccumuloRyaDAO}
> import org.apache.rya.rdftriplestore.{RdfCloudTripleStore,
>> RyaSailRepository}
> import org.openrdf.model.Resource
> import org.openrdf.model.impl.ValueFactoryImpl
> import org.openrdf.repository.sail.SailRepositoryConnection
> 
> 
> final case class Person(firstName: String, lastName: String,
>   country: String, age: Int)
> 
> object Main extends InitSpark {
> def main(args: Array[String]) = {
>   import spark.implicits._
> 
>   val version = spark.version
>   val dataset = spark.read.parquet("/Users/jorge/Downloads/test-
>> d4852e42712.gz.parquet")
> dataset.foreachPartition(itr => {
> System.out.println(System.currentTimeMillis()+ " Starting to
>> get connector")
> val conn = ryaConnection.getConnection
> val a = itr.flatMap(row=>{
> row.schema.map(field=>{
> val any  = row.get(row.schema.fieldIndex(field.name))
> val subject = ryaConnection.vf.createURI("urn:fdc:
>> gfk.com:19980923:mySubject")
> val predicate = ryaConnection.vf.createURI("urn:fdc:
>> gfk.com:"+field.name)
> val obj = ryaConnection.vf.
>> createLiteral(any.toString)
> ryaConnection.vf.createStatement(subject,
>> predicate,obj)
> })
> })
> System.out.println(System.currentTimeMillis()+ " Start
>> writting data")
> a.foreach(conn.add(_))
> System.out.println("Finished Partition")
> conn.close()
> })
> }
> 
> 
>   object ryaConnection{
>   val vf = new ValueFactoryImpl()
>   val store = new RdfCloudTripleStore()
>   val conf = new AccumuloRdfConfiguration()
>   conf.addResource(new File("/Users/jorge/Downloads/
>> hdp/HDFS_CLIENT-configs/hdfs-site.xml").toURI.toURL)
>   conf.addResource(new File("/Users/jorge/Downloads/
>> hdp/HDFS_CLIENT-configs/core-site.xml").toURI.toURL)
>   conf.addResource(new File("/Users/jorge/Downloads/
>> hdp/ACCUMULO_CLIENT-configs/accumulo-site.xml").toURI.toURL)
>   //conf.addResource(new File("/Users/jorge/Downloads/
>> hdp/ACCUMULO_CLIENT-configs/client.conf").toURI.toURL)
>   val dao = new AccumuloRyaDAO()
>   val pass = new PasswordToken("accumulo")
>   val connector: Connector = new 
> ZooKeeperInstance("hdp-accumulo-instance",
>> "sandbox-hdp.hortonworks.com:2181")
>   .getConnector("root",pass)
>   System.out.println("got Connector")
>   dao.setConnector(connector)
>   conf.setTablePrefix("rya_")
>   dao.setConf(conf)
>   store.setRyaDAO(dao)
>   val myRepository = new RyaSailRepository(store)
>   myRepository.initialize()
> 
>   def getConnection: SailRepositoryConnection ={
>myRepository.getConnection
>   }
> 
>   }
> 
> }
> Jorge
> 
> 
 
> 
> 
> 
 
>> 
>> 



Re: rya performance Problems

2018-02-13 Thread Puja Valiyil
Yes.  There is a config parameter on the AccumuloRDFConfiguration that
specifies whether or not to flush after every insert.  If you set this to
"false", then the ingest times should improve.  If set to true, the flush
method on the Accumulo MultitableBatchWriter is called after every each
triple is inserted.


On Tue, Feb 13, 2018 at 9:04 AM, Jorge Machado  wrote:

> From Accumulo BatchWriters ?
>
> Jorge Machado
> www.jmachado.me
>
>
>
>
>
> > On 13 Feb 2018, at 13:13, Puja Valiyil  wrote:
> >
> > Hey Jorge,
> > There is a config value for flushing on insert— make sure that is set to
> false.  If it is set to true, The writer to accumulo will flush after every
> triple which would slow performance down.
> > Hope this helps!
> > Thanks,
> > Puja
> >
> > Sent from my iPhone
> >
> >> On Feb 13, 2018, at 3:43 AM, Jorge Machado  wrote:
> >>
> >>
> >>
> >> Hi Guys,
> >>
> >> I just give it a test drive to rya over a spark job on aws with 5
> accumulo instances.
> >> the performance is really really slow getting only 2000 records per
> second.
> >> each commit to accumulo takes rought 20ms
> >>
> >> Any kind of  trick here or did I miss something ?
> >>
> >> Here is my code (I changed the code to run on aws of course):
> >>> package template.spark
> >>>
> >>> import java.io.File
> >>>
> >>> import org.apache.accumulo.core.client.{ClientConfiguration,
> Connector, ZooKeeperInstance}
> >>> import org.apache.accumulo.core.client.security.tokens.PasswordToken
> >>> import org.apache.rya.accumulo.{AccumuloRdfConfiguration,
> AccumuloRyaDAO}
> >>> import org.apache.rya.rdftriplestore.{RdfCloudTripleStore,
> RyaSailRepository}
> >>> import org.openrdf.model.Resource
> >>> import org.openrdf.model.impl.ValueFactoryImpl
> >>> import org.openrdf.repository.sail.SailRepositoryConnection
> >>>
> >>>
> >>> final case class Person(firstName: String, lastName: String,
> >>>country: String, age: Int)
> >>>
> >>> object Main extends InitSpark {
> >>>  def main(args: Array[String]) = {
> >>>import spark.implicits._
> >>>
> >>>val version = spark.version
> >>>val dataset = spark.read.parquet("/Users/jorge/Downloads/test-
> d4852e42712.gz.parquet")
> >>>  dataset.foreachPartition(itr => {
> >>>  System.out.println(System.currentTimeMillis()+ " Starting to
> get connector")
> >>>  val conn = ryaConnection.getConnection
> >>>  val a = itr.flatMap(row=>{
> >>>  row.schema.map(field=>{
> >>>  val any  = row.get(row.schema.fieldIndex(field.name))
> >>>  val subject = ryaConnection.vf.createURI("urn:fdc:
> gfk.com:19980923:mySubject")
> >>>  val predicate = ryaConnection.vf.createURI("urn:fdc:
> gfk.com:"+field.name)
> >>>  val obj = ryaConnection.vf.
> createLiteral(any.toString)
> >>>  ryaConnection.vf.createStatement(subject,
> predicate,obj)
> >>>  })
> >>>  })
> >>>  System.out.println(System.currentTimeMillis()+ " Start
> writting data")
> >>>  a.foreach(conn.add(_))
> >>>  System.out.println("Finished Partition")
> >>>  conn.close()
> >>>  })
> >>>  }
> >>>
> >>>
> >>>object ryaConnection{
> >>>val vf = new ValueFactoryImpl()
> >>>val store = new RdfCloudTripleStore()
> >>>val conf = new AccumuloRdfConfiguration()
> >>>conf.addResource(new File("/Users/jorge/Downloads/
> hdp/HDFS_CLIENT-configs/hdfs-site.xml").toURI.toURL)
> >>>conf.addResource(new File("/Users/jorge/Downloads/
> hdp/HDFS_CLIENT-configs/core-site.xml").toURI.toURL)
> >>>conf.addResource(new File("/Users/jorge/Downloads/
> hdp/ACCUMULO_CLIENT-configs/accumulo-site.xml").toURI.toURL)
> >>>//conf.addResource(new File("/Users/jorge/Downloads/
> hdp/ACCUMULO_CLIENT-configs/client.conf").toURI.toURL)
> >>>val dao = new AccumuloRyaDAO()
> >>>val pass = new PasswordToken("accumulo")
> >>>val connector: Connector = new 
> >>> ZooKeeperInstance("hdp-accumulo-instance",
> "sandbox-hdp.hortonworks.com:2181")
> >>>.getConnector("root",pass)
> >>>System.out.println("got Connector")
> >>>dao.setConnector(connector)
> >>>conf.setTablePrefix("rya_")
> >>>dao.setConf(conf)
> >>>store.setRyaDAO(dao)
> >>>val myRepository = new RyaSailRepository(store)
> >>>myRepository.initialize()
> >>>
> >>>def getConnection: SailRepositoryConnection ={
> >>> myRepository.getConnection
> >>>}
> >>>
> >>>}
> >>>
> >>> }
> >>> Jorge
> >>>
> >>>
> >> 
> >>>
> >>>
> >>>
> >>
>
>