Re: Call http request from within Spark
Can you explain what do you mean by count never stops? On 15 Jul 2016 00:53, "Amit Dutta"wrote: > Hi All, > > > I have a requirement to call a rest service url for 300k customer ids. > > Things I have tried so far is > > > custid_rdd = sc.textFile('file:Users/zzz/CustomerID_SC/Inactive User > Hashed LCID List.csv') #getting all the customer ids and building adds > > profile_rdd = custid_rdd.map(lambda r: getProfile(r.split(',')[0])) > > profile_rdd.count() > > > #getprofile is the method to do the http call > > def getProfile(cust_id): > > api_key = 'txt' > > api_secret = 'yuyuy' > > profile_uri = 'https://profile.localytics.com/x1/customers/{}' > > customer_id = cust_id > > > if customer_id is not None: > > data = requests.get(profile_uri.format(customer_id), > auth=requests.auth.HTTPBasicAuth(api_key, api_secret)) > > # print json.dumps(data.json(), indent=4) > > return data > > > when I print the json dump of the data i see it returning results from the > rest call. But the count never stops. > > > Is there an efficient way of dealing this? Some post says we have to > define a batch size etc but don't know how. > > > Appreciate your help > > > Regards, > > Amit > >
Re: Call http request from within Spark
Second to what Pedro said in the second paragraph. Issuing http request per row would not scale. On Thu, Jul 14, 2016 at 12:26 PM, Pedro Rodriguezwrote: > Hi Amit, > > Have you tried running a subset of the IDs locally on a single thread? It > would be useful to benchmark your getProfile function for a subset of the > data then estimate how long the full data set would take then divide by > number of spark executor cores. This should at least serve as a sanity > check. If things are much slower than expected is it possible that the > service has a rate limit per ip address that you are hitting? > > If requests is more efficient at batching requests together (I don't know > much about its internal implementation and connection pools) you could do > that with mapPartitions. This is useful when the initialization time of the > function in the map call is expensive (eg uses a connection pool for a db > or web) as it allows you to initialize that resource once per partition > then reuse it for all the elements in the partition. > > Pedro > > On Thu, Jul 14, 2016 at 8:52 AM, Amit Dutta > wrote: > >> Hi All, >> >> >> I have a requirement to call a rest service url for 300k customer ids. >> >> Things I have tried so far is >> >> >> custid_rdd = sc.textFile('file:Users/zzz/CustomerID_SC/Inactive User >> Hashed LCID List.csv') #getting all the customer ids and building adds >> >> profile_rdd = custid_rdd.map(lambda r: getProfile(r.split(',')[0])) >> >> profile_rdd.count() >> >> >> #getprofile is the method to do the http call >> >> def getProfile(cust_id): >> >> api_key = 'txt' >> >> api_secret = 'yuyuy' >> >> profile_uri = 'https://profile.localytics.com/x1/customers/{}' >> >> customer_id = cust_id >> >> >> if customer_id is not None: >> >> data = requests.get(profile_uri.format(customer_id), >> auth=requests.auth.HTTPBasicAuth(api_key, api_secret)) >> >> # print json.dumps(data.json(), indent=4) >> >> return data >> >> >> when I print the json dump of the data i see it returning results from >> the rest call. But the count never stops. >> >> >> Is there an efficient way of dealing this? Some post says we have to >> define a batch size etc but don't know how. >> >> >> Appreciate your help >> >> >> Regards, >> >> Amit >> >> > > > -- > Pedro Rodriguez > PhD Student in Distributed Machine Learning | CU Boulder > UC Berkeley AMPLab Alumni > > ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 > Github: github.com/EntilZha | LinkedIn: > https://www.linkedin.com/in/pedrorodriguezscience > >
Re: Call http request from within Spark
Hi Amit, Have you tried running a subset of the IDs locally on a single thread? It would be useful to benchmark your getProfile function for a subset of the data then estimate how long the full data set would take then divide by number of spark executor cores. This should at least serve as a sanity check. If things are much slower than expected is it possible that the service has a rate limit per ip address that you are hitting? If requests is more efficient at batching requests together (I don't know much about its internal implementation and connection pools) you could do that with mapPartitions. This is useful when the initialization time of the function in the map call is expensive (eg uses a connection pool for a db or web) as it allows you to initialize that resource once per partition then reuse it for all the elements in the partition. Pedro On Thu, Jul 14, 2016 at 8:52 AM, Amit Duttawrote: > Hi All, > > > I have a requirement to call a rest service url for 300k customer ids. > > Things I have tried so far is > > > custid_rdd = sc.textFile('file:Users/zzz/CustomerID_SC/Inactive User > Hashed LCID List.csv') #getting all the customer ids and building adds > > profile_rdd = custid_rdd.map(lambda r: getProfile(r.split(',')[0])) > > profile_rdd.count() > > > #getprofile is the method to do the http call > > def getProfile(cust_id): > > api_key = 'txt' > > api_secret = 'yuyuy' > > profile_uri = 'https://profile.localytics.com/x1/customers/{}' > > customer_id = cust_id > > > if customer_id is not None: > > data = requests.get(profile_uri.format(customer_id), > auth=requests.auth.HTTPBasicAuth(api_key, api_secret)) > > # print json.dumps(data.json(), indent=4) > > return data > > > when I print the json dump of the data i see it returning results from the > rest call. But the count never stops. > > > Is there an efficient way of dealing this? Some post says we have to > define a batch size etc but don't know how. > > > Appreciate your help > > > Regards, > > Amit > > -- Pedro Rodriguez PhD Student in Distributed Machine Learning | CU Boulder UC Berkeley AMPLab Alumni ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423 Github: github.com/EntilZha | LinkedIn: https://www.linkedin.com/in/pedrorodriguezscience