1# -*- coding: utf-8 -*-
2
3from mock import call, Mock
4import pytest
5
6from pyee import EventEmitter
7from pyee.uplift import unwrap, uplift
8
9
10class UpliftedEventEmitter(EventEmitter):
11    pass
12
13
14def test_uplift_emit():
15    call_me = Mock()
16
17    base_ee = EventEmitter()
18
19    @base_ee.on("base_event")
20    def base_handler():
21        call_me("base event on base emitter")
22
23    @base_ee.on("shared_event")
24    def shared_base_handler():
25        call_me("shared event on base emitter")
26
27    uplifted_ee = uplift(UpliftedEventEmitter, base_ee)
28
29    assert isinstance(uplifted_ee, UpliftedEventEmitter), "Returns an uplifted emitter"
30
31    @uplifted_ee.on("uplifted_event")
32    def uplifted_handler():
33        call_me("uplifted event on uplifted emitter")
34
35    @uplifted_ee.on("shared_event")
36    def shared_uplifted_handler():
37        call_me("shared event on uplifted emitter")
38
39    # Events on uplifted proxy correctly
40    assert uplifted_ee.emit("base_event")
41    assert uplifted_ee.emit("shared_event")
42    assert uplifted_ee.emit("uplifted_event")
43
44    call_me.assert_has_calls(
45        [
46            call("base event on base emitter"),
47            call("shared event on uplifted emitter"),
48            call("shared event on base emitter"),
49            call("uplifted event on uplifted emitter"),
50        ]
51    )
52
53    call_me.reset_mock()
54
55    # Events on underlying proxy correctly
56    assert base_ee.emit("base_event")
57    assert base_ee.emit("shared_event")
58    assert base_ee.emit("uplifted_event")
59
60    call_me.assert_has_calls(
61        [
62            call("base event on base emitter"),
63            call("shared event on base emitter"),
64            call("shared event on uplifted emitter"),
65            call("uplifted event on uplifted emitter"),
66        ]
67    )
68
69    call_me.reset_mock()
70
71    # Quick check for unwrap
72    unwrap(uplifted_ee)
73
74    with pytest.raises(AttributeError):
75        getattr(uplifted_ee, "unwrap")
76
77    with pytest.raises(AttributeError):
78        getattr(base_ee, "unwrap")
79
80    assert not uplifted_ee.emit("base_event")
81    assert uplifted_ee.emit("shared_event")
82    assert uplifted_ee.emit("uplifted_event")
83
84    assert base_ee.emit("base_event")
85    assert base_ee.emit("shared_event")
86    assert not base_ee.emit("uplifted_event")
87
88    call_me.assert_has_calls(
89        [
90            # No listener for base event on uplifted
91            call("shared event on uplifted emitter"),
92            call("uplifted event on uplifted emitter"),
93            call("base event on base emitter"),
94            call("shared event on base emitter")
95            # No listener for uplifted event on uplifted
96        ]
97    )
98
99
100@pytest.mark.parametrize("error_handling", ["new", "underlying", "neither"])
101def test_exception_handling(error_handling):
102    base_ee = EventEmitter()
103    uplifted_ee = uplift(UpliftedEventEmitter, base_ee, error_handling=error_handling)
104
105    # Exception handling always prefers uplifted
106    base_error = Exception("base error")
107    uplifted_error = Exception("uplifted error")
108
109    # Hold my beer
110    base_error_handler = Mock()
111    base_ee._emit_handle_potential_error = base_error_handler
112
113    # Hold my other beer
114    uplifted_error_handler = Mock()
115    uplifted_ee._emit_handle_potential_error = uplifted_error_handler
116
117    base_ee.emit("error", base_error)
118    uplifted_ee.emit("error", uplifted_error)
119
120    if error_handling == "new":
121        base_error_handler.assert_not_called()
122        uplifted_error_handler.assert_has_calls(
123            [call("error", base_error), call("error", uplifted_error)]
124        )
125    elif error_handling == "underlying":
126        base_error_handler.assert_has_calls(
127            [call("error", base_error), call("error", uplifted_error)]
128        )
129        uplifted_error_handler.assert_not_called()
130    elif error_handling == "neither":
131        base_error_handler.assert_called_once_with("error", base_error)
132        uplifted_error_handler.assert_called_once_with("error", uplifted_error)
133    else:
134        raise Exception("unrecognized setting")
135
136
137@pytest.mark.parametrize(
138    "proxy_new_listener", ["both", "neither", "forward", "backward"]
139)
140def test_proxy_new_listener(proxy_new_listener):
141    call_me = Mock()
142
143    base_ee = EventEmitter()
144
145    uplifted_ee = uplift(
146        UpliftedEventEmitter, base_ee, proxy_new_listener=proxy_new_listener
147    )
148
149    @base_ee.on("new_listener")
150    def base_new_listener_handler(event, f):
151        assert event in ("event", "new_listener")
152        call_me("base new listener handler", f)
153
154    @uplifted_ee.on("new_listener")
155    def uplifted_new_listener_handler(event, f):
156        assert event in ("event", "new_listener")
157        call_me("uplifted new listener handler", f)
158
159    def fresh_base_handler():
160        pass
161
162    def fresh_uplifted_handler():
163        pass
164
165    base_ee.on("event", fresh_base_handler)
166    uplifted_ee.on("event", fresh_uplifted_handler)
167
168    if proxy_new_listener == "both":
169        call_me.assert_has_calls(
170            [
171                call("base new listener handler", fresh_base_handler),
172                call("uplifted new listener handler", fresh_base_handler),
173                call("uplifted new listener handler", fresh_uplifted_handler),
174                call("base new listener handler", fresh_uplifted_handler),
175            ]
176        )
177    elif proxy_new_listener == "neither":
178        call_me.assert_has_calls(
179            [
180                call("base new listener handler", fresh_base_handler),
181                call("uplifted new listener handler", fresh_uplifted_handler),
182            ]
183        )
184    elif proxy_new_listener == "forward":
185        call_me.assert_has_calls(
186            [
187                call("base new listener handler", fresh_base_handler),
188                call("uplifted new listener handler", fresh_base_handler),
189                call("uplifted new listener handler", fresh_uplifted_handler),
190            ]
191        )
192    elif proxy_new_listener == "backward":
193        call_me.assert_has_calls(
194            [
195                call("base new listener handler", fresh_base_handler),
196                call("uplifted new listener handler", fresh_uplifted_handler),
197                call("base new listener handler", fresh_uplifted_handler),
198            ]
199        )
200    else:
201        raise Exception("unrecognized proxy_new_listener")
202