[ 
https://issues.apache.org/jira/browse/S2GRAPH-252?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DOYUNG YOON updated S2GRAPH-252:
--------------------------------
    Description: 
S2GraphSource is responsible to translate HBASE 
snapshot(*TableSnapshotInputFormat*) to graph element such as edge/vertex.

below code create *RDD[(ImmutableBytesWritable, Result)]* from 
*TableSnapshotInputFormat*

{noformat}
val rdd = ss.sparkContext.newAPIHadoopRDD(job.getConfiguration,
        classOf[TableSnapshotInputFormat],
        classOf[ImmutableBytesWritable],
        classOf[Result])
{noformat}

The problem comes after obtaining RDD. 

Current implementation use *RDD.mapPartitions* because S2Graph class is not 
serializable, mostly because it has Asynchbase client in it.

The problematic part is the following.

{noformat}
val elements = input.mapPartitions { iter =>
      val s2 = S2GraphHelper.getS2Graph(config)

      iter.flatMap { line =>
        reader.read(s2)(line)
      }
    }

    val kvs = elements.mapPartitions { iter =>
      val s2 = S2GraphHelper.getS2Graph(config)

      iter.map(writer.write(s2)(_))
    }
{noformat}

On each RDD partition, S2Graph instance connect meta storage, such as mysql, 
and use the local cache to avoid heavy read from meta storage.

Even though it works with a dataset with the small partition, the scalability 
of S2GraphSource limited by the number of partitions, which need to be 
increased when dealing with large data.

Possible improvement can be achieved by not depending on meta storage when it 
deserializes HBase's Result class into Edge/Vertex. 

We can simply achieve this by loading all necessary schemas from meta storage 
on spark driver, then broadcast these schemas and use them to deserialize 
instead of connecting meta storage on each partition.





  was:
S2GraphSource is responsible to translate HBASE 
snapshot(`TableSnapshotInputFormat`) to graph element such as edge/vertex.

below code create `RDD[(ImmutableBytesWritable, Result)]` from 
TableSnapshotInputFormat

{noformat}
val rdd = ss.sparkContext.newAPIHadoopRDD(job.getConfiguration,
        classOf[TableSnapshotInputFormat],
        classOf[ImmutableBytesWritable],
        classOf[Result])
{noformat}

The problem comes after obtaining RDD. 

Current implementation use `RDD.mapPartitions` because S2Graph class is not 
serializable, mostly because it has Asynchbase client in it.

The problematic part is the following.

{noformat}
val elements = input.mapPartitions { iter =>
      val s2 = S2GraphHelper.getS2Graph(config)

      iter.flatMap { line =>
        reader.read(s2)(line)
      }
    }

    val kvs = elements.mapPartitions { iter =>
      val s2 = S2GraphHelper.getS2Graph(config)

      iter.map(writer.write(s2)(_))
    }
{noformat}

On each RDD partition, S2Graph instance connect meta storage, such as mysql, 
and use the local cache to avoid heavy read from meta storage.

Even though it works with a dataset with the small partition, the scalability 
of S2GraphSource limited by the number of partitions, which need to be 
increased when dealing with large data.

Possible improvement can be achieved by not depending on meta storage when it 
deserializes HBase's Result class into Edge/Vertex. 

We can simply achieve this by loading all necessary schemas from meta storage 
on spark driver, then broadcast these schemas and use them to deserialize 
instead of connecting meta storage on each partition.






> Improve performance of S2GraphSource 
> -------------------------------------
>
>                 Key: S2GRAPH-252
>                 URL: https://issues.apache.org/jira/browse/S2GRAPH-252
>             Project: S2Graph
>          Issue Type: Improvement
>          Components: s2jobs
>            Reporter: DOYUNG YOON
>            Assignee: DOYUNG YOON
>            Priority: Major
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> S2GraphSource is responsible to translate HBASE 
> snapshot(*TableSnapshotInputFormat*) to graph element such as edge/vertex.
> below code create *RDD[(ImmutableBytesWritable, Result)]* from 
> *TableSnapshotInputFormat*
> {noformat}
> val rdd = ss.sparkContext.newAPIHadoopRDD(job.getConfiguration,
>         classOf[TableSnapshotInputFormat],
>         classOf[ImmutableBytesWritable],
>         classOf[Result])
> {noformat}
> The problem comes after obtaining RDD. 
> Current implementation use *RDD.mapPartitions* because S2Graph class is not 
> serializable, mostly because it has Asynchbase client in it.
> The problematic part is the following.
> {noformat}
> val elements = input.mapPartitions { iter =>
>       val s2 = S2GraphHelper.getS2Graph(config)
>       iter.flatMap { line =>
>         reader.read(s2)(line)
>       }
>     }
>     val kvs = elements.mapPartitions { iter =>
>       val s2 = S2GraphHelper.getS2Graph(config)
>       iter.map(writer.write(s2)(_))
>     }
> {noformat}
> On each RDD partition, S2Graph instance connect meta storage, such as mysql, 
> and use the local cache to avoid heavy read from meta storage.
> Even though it works with a dataset with the small partition, the scalability 
> of S2GraphSource limited by the number of partitions, which need to be 
> increased when dealing with large data.
> Possible improvement can be achieved by not depending on meta storage when it 
> deserializes HBase's Result class into Edge/Vertex. 
> We can simply achieve this by loading all necessary schemas from meta storage 
> on spark driver, then broadcast these schemas and use them to deserialize 
> instead of connecting meta storage on each partition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to