Hi Abel,

Pretty interesting.  May I ask how big is your point CSV dataset?

It seems you are relying on searching through the FeatureCollection of
polygons for which one intersects your point.  This is going to be
extremely slow.  I highly recommend using a SpatialIndex, such as the
many that exist in the JTS library itself, to speed things up.

Also, note that the geoscript library is not really maintained
anymore.  I forked it with the intention of maintaining it some more,
but I've decided this is not really a good direction.

On Thu, Sep 18, 2014 at 7:02 PM, Abel Coronado Iruegas
<acoronadoirue...@gmail.com> wrote:
> Now i have a better version, but now the problem is that the saveAsTextFile
> do not finish the Job, in the hdfs repository only exist a partial temporary
> file, someone can tell me what is wrong:
>
> Thanks !!
>
> object SimpleApp {
>
>         def main(args: Array[String]){
>
>                 val conf = new SparkConf().setAppName("Csv Clipper")
>
>                 val sc = new SparkContext(conf)
>
>                 val csvPath =
> "hdfs://m01/user/acoronado/mov/movilidad_64mb.csv"
>
>                 val csv = sc.textFile(csvPath)
>
>                 csv.cache()
>
>                 val clipPoints = csv.map({line: String =>
>
>                                                val Array(usuario, lat, lon,
> date) = line.split(",").map(_.trim)
>
>                                                val punto =
> Point(lon.toDouble,lat.toDouble)
>
>                                                val internal =
> geoDataExternal.get.find(f => f.geometry intersects punto)
>
>                                                val (cve_est, cve_mun) =
> internal match {
>
>                                        case
> Some(f:org.geoscript.feature.Feature) => {
>
>                                                             val index =
> f.getAttribute(1).toString()
>
>                                                             val existe =
> geoDataMun.get(index).find(f => f.geometry intersects punto)
>
>                                                             existe match {
>
>
> case Some(f) => (f.getAttribute(1).toString, f.getAttribute(2).toString)
>
>
> case None => ("0", "0")
>
>                                                                           }
>
>                                                           }
>
>                                           case None => ("0", "0")
>
>                                         }
>
>                                         val time = try {(new
> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")).parse(date.replaceAll("Z$",
> "+0000")).getTime().toString()} catch {case e: Exception => "0"}
>
>
> line+","+time+","+cve_est+","+cve_mun
>
>                                 })
>
>
> clipPoints.saveAsTextFile("hdfs://m01/user/acoronado/mov/resultados_movilidad_60.csv")
>
>                 println("Spark Clip Exito!!!")
>
>         }
>
>         object geoDataMun {
>
>           private val shp = Shapefile("/geoData/MunicipiosLatLon.shp")
>
>           val features = shp.getFeatures.toIterable
>
>       val result = scala.io.Source.fromFile("/geoData/indice_espacial.csv")
>
>         .getLines()
>
>         .toList map { line: String =>
>
>                                        val campos =
> line.split(",").map(_.trim)
>
>                                        val cve_edo = campos(0)
>
>                                        val cve_mun = campos(1)
>
>                                        val index = campos(2)
>
>
> scala.collection.immutable.List(index.toInt , (cve_edo,cve_mun))
>
>                                     }
>
>       val mapaIx = result.groupBy(x=>x(0)).mapValues(cves => cves.map(x =>
> x(1)))
>
>       def get(index:String) = {
>
>         features.filter(f =>
> mapaIx(index.toInt).contains((f.getAttribute(1).toString,f.getAttribute(2).toString)))
>
>       }
>
>         }
>
>     object geoDataExternal{
>
>       private val shp = Shapefile("/geoData/IndiceRecortado.shp")
>
>       val features = shp.getFeatures
>
>       def get:
> FeatureCollection[org.geoscript.feature.Schema,org.geoscript.feature.Feature]
> = features
>
>     }
>
> }
>
>
> the log of the driver is:
>
> 14/09/18 19:27:55 ERROR EndpointWriter: AssociationError
> [akka.tcp://sparkwor...@axaxaxa-cloudera-s05.xxxnetworks.com:44895] ->
> [akka.tcp://sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942]: Error
> [Association failed with
> [akka.tcp://sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942]] [
>
> akka.remote.EndpointAssociationException: Association failed with
> [akka.tcp://sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942]
>
> Caused by:
> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
> Connection refused: axaxaxa-cloudera-s05.xxxnetworks.com/10.5.96.42:43942
>
> ]
>
> 14/09/18 19:27:55 ERROR EndpointWriter: AssociationError
> [akka.tcp://sparkwor...@axaxaxa-cloudera-s05.xxxnetworks.com:44895] ->
> [akka.tcp://sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942]: Error
> [Association failed with
> [akka.tcp://sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942]] [
>
> akka.remote.EndpointAssociationException: Association failed with
> [akka.tcp://sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942]
>
> Caused by:
> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
> Connection refused: axaxaxa-cloudera-s05.xxxnetworks.com/10.5.96.42:43942
>
> ]
>
> 14/09/18 19:27:55 ERROR EndpointWriter: AssociationError
> [akka.tcp://sparkwor...@axaxaxa-cloudera-s05.xxxnetworks.com:44895] ->
> [akka.tcp://sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942]: Error
> [Association failed with
> [akka.tcp://sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942]] [
>
> akka.remote.EndpointAssociationException: Association failed with
> [akka.tcp://sparkexecu...@axaxaxa-cloudera-s05.xxxnetworks.com:43942]
>
> Caused by:
> akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2:
> Connection refused: axaxaxa-cloudera-s05.xxxnetworks.com/10.5.96.42:43942
>
>
>
>
> On Mon, Sep 15, 2014 at 1:30 PM, Abel Coronado Iruegas
> <acoronadoirue...@gmail.com> wrote:
>>
>> Here an example of a working code that takes a csv with lat lon points and
>> intersects with polygons of municipalities of Mexico, generating a new
>> version of the file with new attributes.
>>
>> Do you think that could be improved?
>>
>> Thanks.
>>
>> The Code:
>>
>> import org.apache.spark.SparkContext
>> import org.apache.spark.SparkContext._
>> import org.apache.spark.SparkConf
>> import org.geoscript.feature._
>> import org.geoscript.geometry._
>> import org.geoscript.geometry.builder._
>> import com.vividsolutions.jts._
>> import org.geoscript.layer.Shapefile
>> import org.geotools.feature.FeatureCollection
>> import java.text._
>> import java.util._
>>
>> object SimpleApp {
>>         def main(args: Array[String]){
>>                 val conf = new SparkConf().setAppName("Csv Clipper")
>>                 val sc = new SparkContext(conf)
>>                 val csvPath =
>> "hdfs://x01/user/acoronado/mov/movilidad.csv" //70 Millions of rows
>>                 val csv = sc.textFile(csvPath)
>>                 val clipPoints = csv.map({line: String =>
>>                                                val Array(usuario, lat,
>> lon, date) = line.split(",").map(_.trim)
>>                                                val punto =
>> Point(lon.toDouble,lat.toDouble)
>>                                                val existe =
>> geoData.get.filter(f => f.geometry intersects punto) // Geospatial operation
>>                                                var cve_est = "0"
>>                                                var cve_mun = "0"
>>                                                var time = "0"
>>                                                if(!existe.isEmpty){
>>                                                   val f = existe.take(1)
>>                                                   val ff = f.toList(0)
>>                                                   cve_est =
>> ff.getAttribute(1).toString //State Code
>>                                                   cve_mun =
>> ff.getAttribute(2).toString  // Municipality Code
>>                                                   time = (new
>> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")).parse(date.replaceAll("Z$",
>> "+0000")).getTime().toString()
>>                                                }
>>
>> line+","+time+","+cve_est+","+cve_mun
>>                                            })
>>
>> clipPoints.coalesce(1,true).saveAsTextFile("hdfs://m01/user/acoronado/mov/mov_all.csv")
>>                 println("Spark Clip Exito!!!")
>>         }
>>         object geoData {
>>             private val estatal =
>> Shapefile("/geoData/MunicipiosLatLon.shp") //This directory exist in all the
>> nodes.
>>             private val estatalColl = estatal.getFeatures
>>             def
>> get:FeatureCollection[org.geoscript.feature.Schema,org.geoscript.feature.Feature]
>> = estatalColl
>>         }
>> }
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to