Examples¶
TODO: More examples
Download file¶
import asyncio
import aiosonic
import json
async def run():
url = 'https://images.dog.ceo/breeds/leonberg/n02111129_2301.jpg'
async with aiosonic.HTTPClient() as client:
res = await client.get(url)
assert res.status_code == 200
if res.chunked:
# write in chunks
with open('dog_image.jpg', 'wb') as _file:
async for chunk in res.read_chunks():
_file.write(chunk)
else:
# or write all bytes, for chunked this also works
with open('dog_image.jpg', 'wb') as _file:
_file.write(await res.content())
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
Concurrent Requests¶
import aiosonic
import asyncio
async def main():
urls = [
'https://www.facebook.com/',
'https://www.google.com/',
'https://twitch.tv/',
'https://linkedin.com/',
]
async with aiosonic.HTTPClient() as client:
# asyncio.gather is the key for concurrent requests.
responses = await asyncio.gather(*[client.get(url) for url in urls])
# stream/chunked responses doesn't release the connection acquired
# from the pool until the response has been read, so better to read
# it.
for response in responses:
if response.chunked:
await response.text()
assert all([res.status_code in [200, 301] for res in responses])
loop = asyncio.get_event_loop()
loo.run_until_complete(main())
Chunked Requests¶
Specifying an iterator as the request body, it will make the request transfer made by chunks
import aiosonic
import asyncio
import json
async def main():
async def data():
yield b'foo'
yield b'bar'
async with aiosonic.HTTPClient() as client:
url = 'https://postman-echo.com/post'
response = await client.post(url, data=data())
print(json.dumps(await response.json(), indent=10))
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Use custom DNS¶
Install aiodns in your dependencies and use AsyncResolver
import aiosonic
import asyncio
from aiosonic.resolver import AsyncResolver
async def main():
resolver = AsyncResolver(nameservers=["8.8.8.8", "8.8.4.4"])
connector = aiosonic.TCPConnector(resolver=resolver)
async with aiosonic.HTTPClient(connector=connector) as client:
data = {'foo1': 'bar1', 'foo2': 'bar2'}
url = 'https://postman-echo.com/post'
# server will respond those cookies
response = await client.post(url, json=data)
# client keep cookies in "cookies_map"
print(await response.text())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Use a Proxy Server¶
Just use Proxy class.
You can install proxy.py and use it as a proxy demo.
import asyncio
from aiosonic import HTTPClient, Proxy
async def main():
# Proxy class accepts `auth` argument in the format `user:password`
client = HTTPClient(proxy=Proxy("http://localhost:8899"))
res = await client.get("https://www.google.com/")
print(res)
print(await res.text())
assert res.status_code == 200
asyncio.run(main())
Debug log¶
Configure aiosonic logger at debug level to see some logging
import asyncio
import aiosonic
import json
import logging
async def run():
# setup debug level at log
logger = logging.getLogger('aiosonic')
logger.setLevel(logging.DEBUG)
async with aiosonic.HTTPClient() as client:
response = await client.get('https://www.google.com/')
assert response.status_code == 200
assert 'Google' in (await response.text())
loop = asyncio.get_event_loop()
loop.run_until_complete(run())