Hi all,

I'm working on an application that enriches network connections with 
Geolocations, using the GeoIP database, which is stored as a file in HDFS. To 
do so, I map every connection in my stream, using this function:

def enrichIp(ip: String): Location = {

   val location = service.getLocation(ip)

   Location(location.countryName, Some(location.city), Some(latlon))
}

The variable service is a variable of type LookupService, declared this way:


private lazy val service = new LookupService(Paths.get(path).toFile, 
LookupService.GEOIP_MEMORY_CACHE)


I can see several problems in this architecture. First, I need to copy manually 
the file from HDFS to the local filesytem prior to start the streaming. This 
would be solved with something like a DistributedCache, but it is not available 
in the DataStream API, or at least I can't see it.

Furthermore, I need to load the full GeoIp database into memory for every Flink 
task. This might be OK in terms of performance, but the memory consumption is 
quite high. The other alternative that I see is to load the GeoIP file into an 
external database (Redis, Postgres, etc) and query it in real time, which might 
be also a s good solution... Is there any built-in mechanism to do this?



Kind regards,

Diego



Reply via email to