I’ve used the code below with SparkSQL. I was using this with Spark 1.4 but
should still be good with 1.6. In this case I have a UDF to do the lookup, but
for Streaming you’d just have a lambda to apply in a map function, so no UDF
wrapper.
import org.apache.spark.sql.functions._
import java.io.File
import java.net.InetAddress
import com.maxmind.geoip2._
object GeoIPLookup {
@transient lazy val reader = {
val db = new File("/data/meetup/GeoLite2-City.mmdb")
val reader = new DatabaseReader.Builder(db).build()
reader
}
}
case class Location(latitude: Double, longitude: Double)
case class Geo(city: String, country: String, loc: Location)
val iplookup = udf { (s: String) => {
val ip = InetAddress.getByName(s)
val record = GeoIPLookup.reader.city(ip)
val city = record.getCity
val country = record.getCountry
val location = record.getLocation
Geo(city.getName, country.getName, Location(location.getLatitude,
location.getLongitude))
} }
val withGeo = df.withColumn("geo", iplookup(column("ip")))
From: Zhun Shen<mailto:[email protected]>
Sent: Monday, February 29, 2016 11:17 PM
To: romain sagean<mailto:[email protected]>
Cc: user<mailto:[email protected]>
Subject: Re: Use maxmind geoip lib to process ip on Spark/Spark Streaming
Hi,
I check the dependencies and fix the bug. It work well on Spark but not on
Spark Streaming. So I think I still need find another way to do it.
On Feb 26, 2016, at 2:47 PM, Zhun Shen
<[email protected]<mailto:[email protected]>> wrote:
Hi,
thanks for you advice. I tried your method, I use Gradle to manage my scala
code. 'com.snowplowanalytics:scala-maxmind-iplookups:0.2.0’ was imported in
Gradle.
spark version: 1.6.0
scala: 2.10.4
scala-maxmind-iplookups: 0.2.0
I run my test, got the error as below:
java.lang.NoClassDefFoundError: scala/collection/JavaConversions$JMapWrapperLike
at com.snowplowanalytics.maxmind.iplookups.IpLookups$.apply(IpLookups.scala:53)
On Feb 24, 2016, at 1:10 AM, romain sagean
<[email protected]<mailto:[email protected]>> wrote:
I realize I forgot the sbt part
resolvers += "SnowPlow Repo" at
"http://maven.snplow.com/releases/"<http://maven.snplow.com/releases/>
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.3.0",
"com.snowplowanalytics" %% "scala-maxmind-iplookups" % "0.2.0"
)
otherwise, to process streaming log I use logstash with kafka as input. You can
set kafka as output if you need to do some extra calculation with spark.
Le 23/02/2016 15:07, Romain Sagean a écrit :
Hi,
I use maxmind geoip with spark (no streaming). To make it work you should use
mapPartition. I don't know if something similar exist for spark streaming.
my code for reference:
def parseIP(ip:String, ipLookups: IpLookups): List[String] = {
val lookupResult = ipLookups.performLookups(ip)
val countryName = (lookupResult._1).map(_.countryName).getOrElse("")
val city = (lookupResult._1).map(_.city).getOrElse(None).getOrElse("")
val latitude = (lookupResult._1).map(_.latitude).getOrElse(None).toString
val longitude = (lookupResult._1).map(_.longitude).getOrElse(None).toString
return List(countryName, city, latitude, longitude)
}
sc.addFile("/home/your_user/GeoLiteCity.dat")
//load your data in my_data rdd
my_data.mapPartitions { rows =>
val ipLookups = IpLookups(geoFile =
Some(SparkFiles.get("GeoLiteCity.dat")))
rows.map { row => row ::: parseIP(row(3),ipLookups) }
}
Le 23/02/2016 14:28, Zhun Shen a écrit :
Hi all,
Currently, I sent nginx log to Kafka then I want to use Spark Streaming to
parse the log and enrich the IP info with geoip libs from Maxmind.
I found this one <https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git>
https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git, but spark streaming
throw error and told that the lib was not Serializable.
Does anyone there way to process the IP info in Spark Streaming? Many thanks.