Hi,
I found that I could add sc.addFile(“GeoIP2.mmdb”) in Spark Streaming, so Spark/Spark Streaming both could use GeoIP2.mmdb as @romain said.
Below is the way to addFile in spark streaming: val conf = new SparkConf() val sc = new SparkContext(conf) sc.addFile("path/geoip.mmdb”) val ssc = new StreamingContext(sc, Milliseconds(interval.toInt)) // if in spark streaming
then you could see @romain sagean’s solution.
When you want to use Geoip.mmdb in Spark SQL, you could follow Silvio’s solution. So now we could use geoip2.mmdb in Spark/Spark Streaming/Spark SQL.
Thanks to @Romain Sagean and @Silvio Fiorita.
I’ve used the code below with SparkSQL. I was using this with Spark 1.4 but should still be good with 1.6. In this case I have a UDF to do the lookup, but for Streaming you’d just have a lambda to apply in a map function, so no UDF wrapper. import org.apache.spark.sql.functions._ import java.io.File import java.net.InetAddress import com.maxmind.geoip2._ object GeoIPLookup { @transient lazy val reader = { val db = new File("/data/meetup/GeoLite2-City.mmdb") val reader = new DatabaseReader.Builder(db).build() reader } } case class Location(latitude: Double, longitude: Double) case class Geo(city: String, country: String, loc: Location) val iplookup = udf { (s: String) => { val ip = InetAddress.getByName(s) val record = GeoIPLookup.reader.city(ip) val city = record.getCity val country = record.getCountry val location = record.getLocation Geo(city.getName, country.getName, Location(location.getLatitude, location.getLongitude)) } } val withGeo = df.withColumn("geo", iplookup(column("ip"))) From: Zhun ShenSent: Monday, February 29, 2016 11:17 PM To: romain sageanCc: userSubject: Re: Use maxmind geoip lib to process ip on Spark/Spark Streaming Hi,
I check the dependencies and fix the bug. It work well on Spark but not on Spark Streaming. So I think I still need find another way to do it.
Hi,
thanks for you advice. I tried your method, I use Gradle to manage my scala code. 'com.snowplowanalytics:scala-maxmind-iplookups:0.2.0’ was imported in Gradle.
spark version: 1.6.0 scala: 2.10.4 scala-maxmind-iplookups: 0.2.0
I run my test, got the error as below: java.lang.NoClassDefFoundError: scala/collection/JavaConversions$JMapWrapperLike at com.snowplowanalytics.maxmind.iplookups.IpLookups$.apply(IpLookups.scala:53)
I realize I forgot the sbt part resolvers += "SnowPlow Repo" at "http://maven.snplow.com/releases/"libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "1.3.0", "com.snowplowanalytics" %% "scala-maxmind-iplookups" % "0.2.0" ) otherwise, to process streaming log I use logstash with kafka as input. You can set kafka as output if you need to do some extra calculation with spark. Le 23/02/2016 15:07, Romain Sagean a écrit :
Hi, I use maxmind geoip with spark (no streaming). To make it work you should use mapPartition. I don't know if something similar exist for spark streaming. my code for reference: def parseIP(ip:String, ipLookups: IpLookups): List[String] = { val lookupResult = ipLookups.performLookups(ip) val countryName = (lookupResult._1).map(_.countryName).getOrElse("") val city = (lookupResult._1).map(_.city).getOrElse(None).getOrElse("") val latitude = (lookupResult._1).map(_.latitude).getOrElse(None).toString val longitude = (lookupResult._1).map(_.longitude).getOrElse(None).toString return List(countryName, city, latitude, longitude) } sc.addFile("/home/your_user/GeoLiteCity.dat") //load your data in my_data rdd my_data.mapPartitions { rows => val ipLookups = IpLookups(geoFile = Some(SparkFiles.get("GeoLiteCity.dat"))) rows.map { row => row ::: parseIP(row(3),ipLookups) } } Le 23/02/2016 14:28, Zhun Shen a écrit :
Hi all,
Currently, I sent nginx log to Kafka then I want to use Spark Streaming to parse the log and enrich the IP info with geoip libs from Maxmind.
Does anyone there way to process the IP info in Spark Streaming? Many thanks.
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org
|