I have a problem. I have a dataframe that contains two addresses fromLat, fromLong to toLat, toLong. From these two addresses I want to calculate the distance by car and the duration. For this I use the following API from project-osrm . Unfortunately, the way I do it is very slow. I have about 2 million rows in my dataframe and for each one I would like to measure the duration and the distance. I have found the following code snippet (see below). However, I don't know how to integrate my dataframe so that it passes the values.
How can I call an API 2 million times quickly and write the returned value into my dataframe?
- If I don't get a status code
200back, I should just write None fordistanceandduration.
Dataframe
customerId fromLat fromLong toLat toLong
0 1 48.103190 11.601773 53.446762 10.061031
1 2 48.541160 2.628249 41.374426 2.037211
2 2 25.907100 -100.338113 23.994722 -104.754147
3 3 41.948767 12.742488 49.316171 -33.968543
4 3 None None 41.948767 12.742488
Code with MVC
import pandas as pd
d = {
"customerId": [1, 2, 2, 3, 3],
"fromLat": ["48.103190","48.541160", "25.907100", "41.948767", None],
"fromLong": ["11.601773", "2.628249", "-100.338113", "12.742488", None],
"toLat": ["53.446762", "41.374426", "23.994722", "49.316171", "41.948767"],
"toLong": ["10.061031", "2.037211", "-104.754147", "-33.968543", "12.742488"],
}
df = pd.DataFrame(data=d)
print(df)
import requests
import json
def f(x):
url = f"http://router.project-osrm.org/route/v1/driving/{x['fromLat']},{x['fromLong']};{x['toLat']},{x['toLong']}?overview=false"
r = requests.get(url)
print(r.status_code, r.reason)
if (r.status_code == 200):
data = json.loads(r.text)
x['distance'] = data['routes'][0]['distance']
x['duration'] = data['routes'][0]['duration']
else:
x['distance'] = None
x['duration']= None
return x
df = df.apply(lambda x: f(x), axis=1)
What I want (note: duration and distance are only example values)
customerId fromLat fromLong toLat toLong Distance Duration
0 1 48.103190 11.601773 53.446762 10.061031 500 785
1 2 48.541160 2.628249 41.374426 2.037211 4784 474
2 2 25.907100 -100.338113 23.994722 -104.754147 147 987
3 3 41.948767 12.742488 49.316171 -33.968543 None None
4 3 None None 41.948767 12.742488 None None
Code found at Github
# modified fetch function with semaphore
import random
import asyncio
from aiohttp import ClientSession
async def fetch(url, session):
async with session.get(url) as response:
delay = response.headers.get("DELAY")
date = response.headers.get("DATE")
print("{}:{} with delay {}".format(date, response.url, delay))
return await response.read()
async def bound_fetch(sem, url, session):
# Getter function with semaphore.
async with sem:
await fetch(url, session)
async def run(r):
url = f"http://router.project-osrm.org/route/v1/driving/{fromLat},{fromLong};{toLat},{toLong}?overview=false"
tasks = []
# create instance of Semaphore
sem = asyncio.Semaphore(1000)
# Create client session that will ensure we dont open new connection
# per each request.
async with ClientSession() as session:
for i in range(r):
# pass Semaphore and session to every GET request
task = asyncio.ensure_future(bound_fetch(sem, url.format(i), session))
tasks.append(task)
responses = asyncio.gather(*tasks)
await responses
number = df.shape[0]
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run(number))
loop.run_until_complete(future)
CodePudding user response:
When I do scraping / massive API calling I usually use the good old joblib threading. Here is a general code snippet that should work:
from joblib import Parallel, delayed
import pandas ad pd
import requests
import json
# the function you want to parallelize
def f(x):
# request
res = requests.get(
(
f"http://router.project-osrm.org/route/v1/driving/"
f"{x['fromLat']},{x['fromLong']};{x['toLat']},{x['toLong']}"
f"?overview=false"
)
)
# parse output into a dict if valid
if (res.status_code == 200):
data = json.loads(res.text)
output = {
"customerId": x["customerId"],
"distance" : data['routes'][0]['distance'],
"duration" : data['routes'][0]['duration']
}
# output dict if request failed
else:
output = {
"customerId": x["customerId"],
"distance" : float("nan"),
"duration" : float("nan")
}
return output
# run f(x) in parallel, where x is a row of your df
outputs = Parallel(n_jobs=16, backend="threading")(
delayed(f)(x) for x in df.iterrows()
)
# create pandas df from the list of output dicts
outputs_df = pd.DataFrame(outputs)
Pay attention to the n_jobs arg of Parallel. It is responsible for the number of concurrent executions of f(x). If you are hitting some kind of API limit due to number of requests, you can add time.sleep(some_number_of_seconds) from time module anywhere in the f(x) code to add delay on each request.
Hope this was useful :)
CodePudding user response:
OSRM also offers a C library:
OSRM can be used as a library (libosrm) via C instead of using it through the HTTP interface and osrm-routed. This allows for fine-tuning OSRM and has much less overhead. Here is a quick introduction into how to use libosrm in the upcoming v5 release.
This is probably the way to go if you want to query many routes.
