Hi,
I use maxmind geoip with spark (no streaming). To make it work you should
use mapPartition. I don't know if something similar exist for spark
streaming.
my code for reference:
def parseIP(ip:String, ipLookups: IpLookups): List[String] = {
val lookupResult = ipLookups.performLookups(ip)
val countryName = (lookupResult._1).map(_.countryName).getOrElse("")
val city = (lookupResult._1).map(_.city).getOrElse(None).getOrElse("")
val latitude =
(lookupResult._1).map(_.latitude).getOrElse(None).toString
val longitude =
(lookupResult._1).map(_.longitude).getOrElse(None).toString
return List(countryName, city, latitude, longitude)
}
sc.addFile("/home/your_user/GeoLiteCity.dat")
//load your data in my_data rdd
my_data.mapPartitions { rows =>
val ipLookups = IpLookups(geoFile =
Some(SparkFiles.get("GeoLiteCity.dat")))
rows.map { row => row ::: parseIP(row(3),ipLookups) }
}
Le 23/02/2016 14:28, Zhun Shen a écrit :
Hi all,
Currently, I sent nginx log to Kafka then I want to use Spark Streaming to
parse the log and enrich the IP info with geoip libs from Maxmind.
I found this one https://github.com/Sanoma-CDA/maxmind-geoip2-scala.git,
but spark streaming throw error and told that the lib was not Serializable.
Does anyone there way to process the IP info in Spark Streaming? Many
thanks.