Ich nutze schon länger aiohttp. Aktuell restrukturiere ich den betreffenden Code und stoße dabei auf Probleme, die mich vermuten lassen, dass ich einige grundlegende Dinge bzgl. aiohttp bzw. asyncio missverstehe.
Die vielen Tutorials und StackOverflow Beiträge sind sehr aufschlussreich. Aber sobald ich diese Beispiele in meine eigene Programmstruktur übertragen möchte, stoße ich auf Probleme - genauer 1 Error und 2 Warnings.
Code: Alles auswählen
RuntimeError: This event loop is already running.
sys:1: RuntimeWarning: coroutine 'wait' was never awaited
sys:1: RuntimeWarning: coroutine 'FetchAsync._fetch' was never awaited
Das folgende MWE repliziert relativ gut die Struktur meines Real-Codes.
Code: Alles auswählen
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import asyncio
import aiohttp
class FetchAsync:
def __init__(self):
pass
def _get_loop(self):
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
finally:
loop.set_debug(True)
return loop
async def _receive_via_aiohttp(self, session, url, headers):
async with session.get(url, headers=headers) as response:
content = await response.read()
return response, content
async def _fetch(self,
url,
session):
headers = {'User-Agent': 'MyAgent'}
# use aiohttp to get feed/xml content and response object
response, content = await self._receive_via_aiohttp(session,
url,
headers)
# do a lot more stuff...
def run(self):
loop = self._get_loop()
asyncio.run(self._run_async())
loop.close()
async def _run_async(self):
async with aiohttp.ClientSession() as session:
# in real there are much more URLs
urls = ['https://cnn.com',
'https://fsfe.org']
# create the "jobs" (futures)
futures = [self._fetch(url, session)
for url
in urls]
# run the "jobs" asynchrone
self._get_loop().run_until_complete(asyncio.wait(futures))
if __name__ == '__main__':
obj = FetchAsync()
obj.run()
Code: Alles auswählen
Traceback (most recent call last):
File "/home/user/share/work/aiotest/./fetchfeeds.py", line 62, in <module>
obj.run()
File "/home/user/share/work/aiotest/./fetchfeeds.py", line 43, in run
asyncio.run(self._run_async())
File "/usr/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
return future.result()
File "/home/user/share/work/aiotest/./fetchfeeds.py", line 58, in _run_async
self._get_loop().run_until_complete(asyncio.wait(futures))
File "/usr/lib/python3.9/asyncio/base_events.py", line 618, in run_until_complete
self._check_running()
File "/usr/lib/python3.9/asyncio/base_events.py", line 578, in _check_running
raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running
sys:1: RuntimeWarning: coroutine 'wait' was never awaited
sys:1: RuntimeWarning: coroutine 'FetchAsync._fetch' was never awaited
- run() ist dort, weil die Klasse eigentlich ein Thread ist.
- _run_async() ist da, weil ich async with ClientSession() sonst nicht aufrufen kann.
- Das Session Objekt soll für alle get() Aufrufe genutzt werden.
- Im Real-Code sind es zwischen 100 und 300 URLs/Jobs die hier laufen sollen.
- _receive_via_aiohttp() "kapselt" aiohttp, damit ich das bei unittests einfach mocken kann. So das in einem unittest, das reale aiohttp gar nicht berührt wird und ich den content und response fest definieren kann.