# -*- coding: utf-8 -*- from asyncio import Future, wait_for import pytest import pytest_asyncio.plugin # noqa try: from asyncio.exceptions import TimeoutError # type: ignore except ImportError: from concurrent.futures import TimeoutError # type: ignore from mock import Mock from twisted.internet.defer import succeed from pyee import AsyncIOEventEmitter, TwistedEventEmitter class PyeeTestError(Exception): pass @pytest.mark.asyncio async def test_asyncio_emit(event_loop): """Test that AsyncIOEventEmitter can handle wrapping coroutines """ ee = AsyncIOEventEmitter(loop=event_loop) should_call = Future(loop=event_loop) @ee.on("event") async def event_handler(): should_call.set_result(True) ee.emit("event") result = await wait_for(should_call, 0.1) assert result is True @pytest.mark.asyncio async def test_asyncio_once_emit(event_loop): """Test that AsyncIOEventEmitter also wrap coroutines when using once """ ee = AsyncIOEventEmitter(loop=event_loop) should_call = Future(loop=event_loop) @ee.once("event") async def event_handler(): should_call.set_result(True) ee.emit("event") result = await wait_for(should_call, 0.1) assert result is True @pytest.mark.asyncio async def test_asyncio_error(event_loop): """Test that AsyncIOEventEmitter can handle errors when wrapping coroutines """ ee = AsyncIOEventEmitter(loop=event_loop) should_call = Future(loop=event_loop) @ee.on("event") async def event_handler(): raise PyeeTestError() @ee.on("error") def handle_error(exc): should_call.set_result(exc) ee.emit("event") result = await wait_for(should_call, 0.1) assert isinstance(result, PyeeTestError) @pytest.mark.asyncio async def test_asyncio_cancellation(event_loop): """Test that AsyncIOEventEmitter can handle Future cancellations""" cancel_me = Future(loop=event_loop) should_not_call = Future(loop=event_loop) ee = AsyncIOEventEmitter(loop=event_loop) @ee.on("event") async def event_handler(): cancel_me.cancel() @ee.on("error") def handle_error(exc): should_not_call.set_result(None) ee.emit("event") try: await wait_for(should_not_call, 0.1) except TimeoutError: pass else: raise PyeeTestError() @pytest.mark.asyncio async def test_sync_error(event_loop): """Test that regular functions have the same error handling as coroutines""" ee = AsyncIOEventEmitter(loop=event_loop) should_call = Future(loop=event_loop) @ee.on("event") def sync_handler(): raise PyeeTestError() @ee.on("error") def handle_error(exc): should_call.set_result(exc) ee.emit("event") result = await wait_for(should_call, 0.1) assert isinstance(result, PyeeTestError) def test_twisted_emit(): """Test that TwistedEventEmitter can handle wrapping coroutines """ ee = TwistedEventEmitter() should_call = Mock() @ee.on("event") async def event_handler(): _ = await succeed("yes!") should_call(True) ee.emit("event") should_call.assert_called_once() def test_twisted_once(): """Test that TwistedEventEmitter also wraps coroutines for once """ ee = TwistedEventEmitter() should_call = Mock() @ee.once("event") async def event_handler(): _ = await succeed("yes!") should_call(True) ee.emit("event") should_call.assert_called_once() def test_twisted_error(): """Test that TwistedEventEmitters handle Failures when wrapping coroutines.""" ee = TwistedEventEmitter() should_call = Mock() @ee.on("event") async def event_handler(): raise PyeeTestError() @ee.on("failure") def handle_error(e): should_call(e) ee.emit("event") should_call.assert_called_once()