任务背景:不停地去第三方服务器请求HTTP资源,根据请求到的信息做下一步动作
当前方案:启动一个单独的线程不停的监听任务列表,有任务就丢给协程去异步处理
大致代码如下:
import sys
import time
from functools import partial
from threading import Thread
from typing import Callable
import anyio
import httpx
class Monitor:
running = True
tasks: list[Callable] = []
results: list = []
@classmethod
async def gather(cls, async_func) -> None:
cls.results.append(await async_func())
@classmethod
def wait(cls, seconds: int, interval=1) -> None:
for _ in range(int(seconds / interval)):
if cls.tasks:
time.sleep(interval)
def forever_loop() -> None:
async def waiting() -> None:
async with anyio.create_task_group() as tg:
while Monitor.running:
for t in Monitor.tasks:
tg.start_soon(Monitor.gather, t)
Monitor.tasks.clear()
await anyio.sleep(0.01)
tg.cancel_scope.cancel()
anyio.run(waiting)
class EventLoopThread(Thread):
def __init__(self) -> None:
super().__init__(target=forever_loop)
def close(self) -> None:
Monitor.running = False
@staticmethod
def add_task(async_func: Callable) -> None:
Monitor.tasks.append(async_func)
@staticmethod
def wait_until(seconds: int) -> None:
Monitor.wait(seconds=seconds)
async def fetch(url: str, verbose=True) -> httpx.Response:
async with httpx.AsyncClient(verify=False) as client:
r = await client.get(url)
if verbose:
print(f"{url = }")
print(r.text)
return r
def main() -> None:
total = 10
url = "https://qq.com"
if sys.argv[1:]:
arg = sys.argv[1]
if arg.isdigit():
total = int(arg)
elif arg.startswith("http"):
url = arg
t = EventLoopThread()
t.daemon = True
t.start()
for _ in range(total):
t.add_task(partial(fetch, url=url))
t.wait_until(seconds=10)
t.close()
t.join()
print(f"{len(Monitor.results)=}")
print("Done.")
if __name__ == "__main__":
main()