Usage
A basic use case for Unparallel could be fetching some JSON data from a REST API asynchronously. In the example below, we issue 100 GET requests with different query parameters.
import asyncio
from unparallel import up
async def main():
url = "https://httpbin.org"
paths = [f"/get?foo={i}" for i in range(100)]
return await up(paths, base_url=url)
results = asyncio.run(main())
print([item["args"] for item in results[:5]])
This prints:
Making async requests: 100%|███████████| 100/100 [00:01<00:00, 99.94it/s]
[{'foo': '0'}, {'foo': '1'}, {'foo': '2'}, {'foo': '3'}, {'foo': '4'}]
POSTing Data
One body for different paths
If you want to do POST instead of GET requests, you just have to pass method='POST'
to up()
and add some data.
import asyncio
from pprint import pp
from unparallel import up
async def main():
url = "https://httpbin.org"
data = {"item_id": 1, "type": "tree"}
paths = [f"/post?bar={i}" for i in range(100)]
return await up(paths, method="POST", base_url=url, payloads=data)
results = asyncio.run(main())
pp([{"args": item["args"], "data": item["data"]} for item in results[:5]])
This prints:
Making async requests: 100%|███████████| 100/100 [00:01<00:00, 63.40it/s]
[{'args': {'bar': '0'}, 'data': '{"item_id": 1, "type": "tree"}'},
{'args': {'bar': '1'}, 'data': '{"item_id": 1, "type": "tree"}'},
{'args': {'bar': '2'}, 'data': '{"item_id": 1, "type": "tree"}'},
{'args': {'bar': '3'}, 'data': '{"item_id": 1, "type": "tree"}'},
{'args': {'bar': '4'}, 'data': '{"item_id": 1, "type": "tree"}'}]
Different bodies for one path
As you might not need different query parameters for your POST URL, i.e. you have only one URL/path, but you want to POST a list of different JSON bodies, this is also supported.
import asyncio
from pprint import pp
from unparallel import up
async def main():
url = "https://httpbin.org"
path = "/post"
data = [{"type": "tree", "height": i} for i in range(100)]
return await up(path, method="POST", base_url=url, payloads=data)
results = asyncio.run(main())
pp([item["data"] for item in results[:5]])
This prints:
Making async requests: 100%|███████████| 100/100 [00:01<00:00, 63.40it/s]
['{"type": "tree", "height": 0}',
'{"type": "tree", "height": 1}',
'{"type": "tree", "height": 2}',
'{"type": "tree", "height": 3}',
'{"type": "tree", "height": 4}']
Custom response functions
Per default, Unparallel will call the .json()
function on every httpx.Response
and
return the result. But you might want to get other data from the response like the
content as plain-text or the HTTP status.
For this, define your own response function/callback using a def
or lambda
and pass
it to up()
via the keyword response_fn
. The function will receive a httpx.Response
object as the argument and can return anything.
If you want to process the raw httpx.Response
later yourself, you can simply set
response_fn=None
.
The example below demonstrates how to use a custom response function to get the .text
of a response:
import asyncio
from unparallel import up
async def main():
urls = [
"https://www.example.com",
"https://duckduckgo.com/",
"https://github.com",
]
return await up(urls, response_fn=lambda x: x.text)
results = asyncio.run(main())
for res in results:
print(repr(res[:50]))
This should print something similar to the following:
Making async requests: 100%|███████████| 3/3 [00:00<00:00, 4.43it/s]
'<!doctype html>\n<html>\n<head>\n <title>Example D'
'<!DOCTYPE html><html lang="en-US"><head><meta char'
'\n\n\n\n\n\n<!DOCTYPE html>\n<html\n lang="en"\n \n \n da'
Other HTTP methods
Besides the popular GET and POST methods, you can use any other HTTP method supported
by HTTPX - which are GET
, POST
, PUT
, DELETE
, HEAD
, PATCH
, and OPTIONS
.
For example, you can get the status of services/webpages using the method HEAD
in
combination with a custom response function:
import asyncio
from unparallel import up
async def main():
urls = [
"https://www.example.com",
"https://duckduckgo.com/",
"https://github.com",
]
return await up(urls, method="HEAD", response_fn=lambda x: x.status_code)
results = asyncio.run(main())
print(results)
This prints:
Pagination and flattening
Another use case for Unparallel could be fetching all items from a REST API using pagination. This is useful if there are too many objects to get them all at once and you want to avoid making a single request for every item.
Let's say you want to list 1000 items from this University Domains and Names API.
This API supports pagination via the query parameters
offset
and limit
.
import asyncio
from pprint import pp
from unparallel import up
async def main():
url = "http://universities.hipolabs.com"
paths = [f"/search?limit=20&offset={i}" for i in range(0, 1000, 20)]
return await up(paths, base_url=url)
results = asyncio.run(main())
print(f"#Results: {len(results)}")
pp([(type(item), len(item)) for item in results[:5]])
This prints:
#Results: 50
[(<class 'list'>, 20),
(<class 'list'>, 20),
(<class 'list'>, 20),
(<class 'list'>, 20),
(<class 'list'>, 20)]
Because we get a page (list of items) per request, the number of objects in results
is 50 (1000/20) and the university metadata is within a nested array structure:
[
[
{
"name": "Kharkiv National University",
"country": "Ukraine",
"alpha_two_code": "UA",
...
},
# ... 19 more items
],
[
... # next page of 20 items
],
...
]
But in most cases, you would want just one list of all objects (universities), i.e. a
flattened array. Unparallel will flatten the data for you if you pass
flatten_result=True
.
import asyncio
from pprint import pp
from unparallel import up
async def main():
url = "http://universities.hipolabs.com"
paths = [f"/search?limit=20&offset={i}" for i in range(0, 1000, 20)]
return await up(paths, base_url=url, flatten_result=True)
results = asyncio.run(main())
print(f"#Results: {len(results)}")
pp([(type(item), len(item)) for item in results[:5]])
pp(results[:2])
This prints:
#Results: 1000
[(<class 'dict'>, 6),
(<class 'dict'>, 6),
(<class 'dict'>, 6),
(<class 'dict'>, 6),
(<class 'dict'>, 6)]
[{'name': 'Kharkiv National University',
'country': 'Ukraine',
'alpha_two_code': 'UA',
'domains': ['student.karazin.ua', 'karazin.ua'],
'web_pages': ['https://karazin.ua'],
'state-province': None},
{'name': 'Universidad Técnica Federico Santa María',
'country': 'Chile',
'alpha_two_code': 'CL',
'domains': ['usm.cl'],
'web_pages': ['https://usm.cl'],
'state-province': None}]
Configuring limits and timeouts
You can configure connection pool limits and request timeouts using the following
parameters in the up()
method:
max_connections (int)
: The total number of simultaneous TCP connections. This is passed intohttpx.Limits()
and defaults to 100.limits (Optional[httpx.Limits])
: Explicitly set the HTTPX limits configuration. This overridesmax_connections
.timeout (int)
: The timeout for requests in seconds. This is passed intohttpx.Timeout()
and defaults to 10.timeouts (Optional[httpx.Timeout])
: Explicitly set the HTTPX timeout configuration. This overridestimeout
.
If you don't care about a detailed timeout or limits configuration, you can use
up()
without specifying any of those options:
Otherwise, you can use the simplified parameters max_connections
for setting the
connection limit and/or timeout
for setting (all) timeouts for requests:
This results in the limits config created via httpx.Limits(max_connections=10, **default_values)
and timeout config created via httpx.Timeout(60)
For more fine-grained control over limits and timeouts, just specify the HTTPX configs
and pass them to up()
:
results = await up(
urls,
limits=httpx.Limits(max_connections=20, ...),
timeouts=httpx.Timeout(connect=5, ...)
)
Custom client
If you want full control over the HTTPX client or use advanced configuration options,
e.g. authentication or
proxies, you can configure it outside
and simply pass the httpx.AsyncClient
instance to Unparallel.
import asyncio
import httpx
from unparallel import up
async def main():
url = "https://httpbin.org"
paths = [f"/get?foo={i}" for i in range(20)]
auth = httpx.BasicAuth(username="username", password="secret")
async with httpx.AsyncClient(base_url=url, auth=auth) as client:
results = await up(paths, client=client)
return results
results = asyncio.run(main())
for item in results[:5]:
for h_key, h_value in item["headers"].items():
if h_key == "Authorization":
print(f"Args = {item['args']}, Auth = {h_key}: {h_value}'")
This prints:
Args = {'foo': '0'}, Auth = Authorization: Basic dXNlcm5hbWU6c2VjcmV0'
Args = {'foo': '1'}, Auth = Authorization: Basic dXNlcm5hbWU6c2VjcmV0'
Args = {'foo': '2'}, Auth = Authorization: Basic dXNlcm5hbWU6c2VjcmV0'
Args = {'foo': '3'}, Auth = Authorization: Basic dXNlcm5hbWU6c2VjcmV0'
Args = {'foo': '4'}, Auth = Authorization: Basic dXNlcm5hbWU6c2VjcmV0'
Retries
Per default, every HTTP request will be retried 3 times if an exception of the type
httpx.TimeoutException
is raised. You can change the behavior by passing e.g.
max_retries_on_timeout=5
to up()
for doing 5 retries, or pass 0
for disabling them.
Progress bar
Unparallel uses tqdm to display the progress of the
HTTP requests - specifically tqdm.asyncio.tqdm.as_completed()
is used to iterate over the list of async tasks (HTTP requests).
You can disable the progress bar by passing progress=False
to up()
.