有没有办法手动打开asyncio事件循环

我想使用事件循环监视任何插入数据到我的asyncio.Queue(你可以在这里找到它的源代码https://github.com/python/cpython/blob/3.6/Lib/asyncio/queues.py),但我遇到了一些问题。 这是下面的代码:

import asyncio
import threading

async def recv(q):
    while True:
        msg = await q.get()
        print(msg)

async def checking_task():
    while True:
        await asyncio.sleep(0.1)

def loop_in_thread(loop,q):
    asyncio.set_event_loop(loop)
    asyncio.ensure_future(recv(q))
    asyncio.ensure_future(insert(q))
    # asyncio.ensure_future(checking_task()) comment this out, and it will work as intended
    loop.run_forever()

async def insert(q):
    print('invoked')
    await q.put('hello')

q = asyncio.Queue() 
loop = asyncio.get_event_loop()
t = threading.Thread(target=loop_in_thread, args=(loop, q,))
t.start()

该计划已经开始,我们可以看到以下结果

invoked
hello
-> print(asyncio.Task.all_tasks())
{<Task pending coro=<recv() running at C:/Users/costa/untitled3.py:39>
wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x000001E215DCFAC8>()]>>}

但是现在如果我们使用q.put_nowait('test')手动添加数据到q ,我们会得到以下结果:

q.put_nowait('test') # a non-async way to add data into queue
-> print(asyncio.Task.all_tasks())
{<Task pending coro=<recv() running at C:/Users/costa/untitled3.py:39>
 wait_for=<Future finished result=None>>}

正如你所看到的,未来已经完成了,但我们仍然没有打印出新添加的字符串'test' 。 换句话说,即使与q.get()相关的Future已完成并且没有其他任务正在运行, msg = await q.get()仍然在等待。 这使我困惑,因为在官方文档(https://docs.python.org/3/library/asyncio-task.html)中,它表示

结果=等待未来或结果=未来的收益 - 暂停协程,直到未来完成,然后返回未来的结果

看起来,即使未来完成了,我们仍然需要在其他异步函数中进行某种await ,以使事件循环保持处理任务。

我发现了一个解决这个问题的方法,它添加一个checking_task() ,并且将该协程添加到事件循环中; 那么它将按预期工作。

但是添加一个checking_task()协程对于CPU来说是非常昂贵的,因为它只是运行一个while循环。 我想知道是否有一些手动方式让我们触发该await事件而不使用异步功能。 例如,一些神奇的东西

q.put_nowait('test')
loop.ok_you_can_start_running_other_pending_tasks()

帮助将不胜感激! 谢谢。


所以我结束了使用

loop.call_soon_threadsafe(q.put_nowait, 'test')

它会按预期工作。 在弄清楚之后,我搜索了一些有关的信息。 原来这篇文章(从另一个线程调度一个asyncio协程)有同样的问题。 而@ kfx的答案也可以,这是

loop.call_soon_threadsafe(loop.create_task, q.put('test'))

注意asyncio.Queue.put()是一个协程,但asyncio.Queue.put_nowait()是一个正常的函数。

链接地址: http://www.djcxy.com/p/53215.html

上一篇: Is there a way to manually switch on asyncio event loop

下一篇: Coroutine with StateT and ST and IO