
I found that I could add sc.addFile(“GeoIP2.mmdb”) in Spark Streaming, so Spark/Spark Streaming both could use GeoIP2.mmdb as @romain said.

Below is the way to addFile in spark streaming:
 val conf = new SparkConf()
 val sc = new SparkContext(conf)
 val ssc = new StreamingContext(sc, Milliseconds(interval.toInt))   // if in spark streaming

then you could see @romain sagean’s solution.

When you want to use Geoip.mmdb in Spark SQL, you could follow Silvio’s solution. 
So now we could use geoip2.mmdb in Spark/Spark Streaming/Spark SQL.

Thanks to @Romain Sagean and @Silvio Fiorita.

On Mar 1, 2016, at 1:05 PM, Silvio Fiorito <silvio.fior...@granturing.com> wrote:


I’ve used the code below with SparkSQL. I was using this with Spark 1.4 but should still be good with 1.6. In this case I have a UDF to do the lookup, but for Streaming you’d just have a lambda to apply in a map function, so no UDF wrapper.


import org.apache.spark.sql.functions._
import java.io.File
import java.net.InetAddress
import com.maxmind.geoip2._


object GeoIPLookup {
    @transient lazy val reader = {
        val db = new File("/data/meetup/GeoLite2-City.mmdb")


        val reader = new DatabaseReader.Builder(db).build()




case class Location(latitude: Double, longitude: Double)
case class Geo(city: String, country: String, loc: Location)


val iplookup = udf { (s: String) => {
   val ip = InetAddress.getByName(s)


   val record = GeoIPLookup.reader.city(ip)


   val city = record.getCity
   val country = record.getCountry
   val location = record.getLocation


   Geo(city.getName, country.getName, Location(location.getLatitude, location.getLongitude))
} }


val withGeo = df.withColumn("geo", iplookup(column("ip")))



From: Zhun Shen
Sent: Monday, February 29, 2016 11:17 PM
To: romain sagean
Cc: user
Subject: Re: Use maxmind geoip lib to process ip on Spark/Spark Streaming 



I check the dependencies and fix the bug. It work well on Spark but not on Spark Streaming. So I think I still need find another way to do it.

On Feb 26, 2016, at 2:47 PM, Zhun Shen <shenzhunal...@gmail.com> wrote:


thanks for you advice. I tried your method, I use Gradle to manage my scala code. 'com.snowplowanalytics:scala-maxmind-iplookups:0.2.0’ was imported in Gradle.

spark version: 1.6.0
scala: 2.10.4
scala-maxmind-iplookups: 0.2.0

I run my test, got the error as below:
java.lang.NoClassDefFoundError: scala/collection/JavaConversions$JMapWrapperLike
at com.snowplowanalytics.maxmind.iplookups.IpLookups$.apply(IpLookups.scala:53)

On Feb 24, 2016, at 1:10 AM, romain sagean <romain.sag...@hupi.fr> wrote:

I realize I forgot the sbt part

resolvers += "SnowPlow Repo" at "http://maven.snplow.com/releases/"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.3.0",
  "com.snowplowanalytics"  %% "scala-maxmind-iplookups"  % "0.2.0"

otherwise, to process streaming log I use logstash with kafka as input. You can set kafka as output if you need to do some extra calculation with spark.

Le 23/02/2016 15:07, Romain Sagean a écrit :
I use maxmind geoip with spark (no streaming). To make it work you should use mapPartition. I don't know if something similar exist for spark streaming.

my code for reference:

  def parseIP(ip:String, ipLookups: IpLookups): List[String] = {
    val lookupResult = ipLookups.performLookups(ip)
    val countryName = (lookupResult._1).map(_.countryName).getOrElse("")
    val city = (lookupResult._1).map(_.city).getOrElse(None).getOrElse("")
    val latitude = (lookupResult._1).map(_.latitude).getOrElse(None).toString
    val longitude = (lookupResult._1).map(_.longitude).getOrElse(None).toString
    return List(countryName, city, latitude, longitude)

//load your data in my_data rdd

my_data.mapPartitions { rows =>
        val ipLookups = IpLookups(geoFile = Some(SparkFiles.get("GeoLiteCity.dat")))
        rows.map { row => row ::: parseIP(row(3),ipLookups) }

Le 23/02/2016 14:28, Zhun Shen a écrit :
Hi all,

Currently, I sent nginx log to Kafka then I want to use Spark Streaming to parse the log and enrich the IP info with geoip libs from Maxmind. 

I found this one https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git, but spark streaming throw error and told that the lib was not Serializable.

Does anyone there way to process the IP info in Spark Streaming? Many thanks.

--------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to