summaryrefslogtreecommitdiff
path: root/venv/lib/python3.11/site-packages/greenlet/tests/test_contextvars.py
blob: 9a16f6710aeb3734f15b3c9932a9519b64ba2f5b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
from __future__ import print_function

import gc
import sys
import unittest

from functools import partial
from unittest import skipUnless
from unittest import skipIf

from greenlet import greenlet
from greenlet import getcurrent
from . import TestCase


try:
    from contextvars import Context
    from contextvars import ContextVar
    from contextvars import copy_context
    # From the documentation:
    #
    # Important: Context Variables should be created at the top module
    # level and never in closures. Context objects hold strong
    # references to context variables which prevents context variables
    # from being properly garbage collected.
    ID_VAR = ContextVar("id", default=None)
    VAR_VAR = ContextVar("var", default=None)
    ContextVar = None
except ImportError:
    Context = ContextVar = copy_context = None

# We don't support testing if greenlet's built-in context var support is disabled.
@skipUnless(Context is not None, "ContextVar not supported")
class ContextVarsTests(TestCase):
    def _new_ctx_run(self, *args, **kwargs):
        return copy_context().run(*args, **kwargs)

    def _increment(self, greenlet_id, callback, counts, expect):
        ctx_var = ID_VAR
        if expect is None:
            self.assertIsNone(ctx_var.get())
        else:
            self.assertEqual(ctx_var.get(), expect)
        ctx_var.set(greenlet_id)
        for _ in range(2):
            counts[ctx_var.get()] += 1
            callback()

    def _test_context(self, propagate_by):
        # pylint:disable=too-many-branches
        ID_VAR.set(0)

        callback = getcurrent().switch
        counts = dict((i, 0) for i in range(5))

        lets = [
            greenlet(partial(
                partial(
                    copy_context().run,
                    self._increment
                ) if propagate_by == "run" else self._increment,
                greenlet_id=i,
                callback=callback,
                counts=counts,
                expect=(
                    i - 1 if propagate_by == "share" else
                    0 if propagate_by in ("set", "run") else None
                )
            ))
            for i in range(1, 5)
        ]

        for let in lets:
            if propagate_by == "set":
                let.gr_context = copy_context()
            elif propagate_by == "share":
                let.gr_context = getcurrent().gr_context

        for i in range(2):
            counts[ID_VAR.get()] += 1
            for let in lets:
                let.switch()

        if propagate_by == "run":
            # Must leave each context.run() in reverse order of entry
            for let in reversed(lets):
                let.switch()
        else:
            # No context.run(), so fine to exit in any order.
            for let in lets:
                let.switch()

        for let in lets:
            self.assertTrue(let.dead)
            # When using run(), we leave the run() as the greenlet dies,
            # and there's no context "underneath". When not using run(),
            # gr_context still reflects the context the greenlet was
            # running in.
            if propagate_by == 'run':
                self.assertIsNone(let.gr_context)
            else:
                self.assertIsNotNone(let.gr_context)


        if propagate_by == "share":
            self.assertEqual(counts, {0: 1, 1: 1, 2: 1, 3: 1, 4: 6})
        else:
            self.assertEqual(set(counts.values()), set([2]))

    def test_context_propagated_by_context_run(self):
        self._new_ctx_run(self._test_context, "run")

    def test_context_propagated_by_setting_attribute(self):
        self._new_ctx_run(self._test_context, "set")

    def test_context_not_propagated(self):
        self._new_ctx_run(self._test_context, None)

    def test_context_shared(self):
        self._new_ctx_run(self._test_context, "share")

    def test_break_ctxvars(self):
        let1 = greenlet(copy_context().run)
        let2 = greenlet(copy_context().run)
        let1.switch(getcurrent().switch)
        let2.switch(getcurrent().switch)
        # Since let2 entered the current context and let1 exits its own, the
        # interpreter emits:
        # RuntimeError: cannot exit context: thread state references a different context object
        let1.switch()

    def test_not_broken_if_using_attribute_instead_of_context_run(self):
        let1 = greenlet(getcurrent().switch)
        let2 = greenlet(getcurrent().switch)
        let1.gr_context = copy_context()
        let2.gr_context = copy_context()
        let1.switch()
        let2.switch()
        let1.switch()
        let2.switch()

    def test_context_assignment_while_running(self):
        # pylint:disable=too-many-statements
        ID_VAR.set(None)

        def target():
            self.assertIsNone(ID_VAR.get())
            self.assertIsNone(gr.gr_context)

            # Context is created on first use
            ID_VAR.set(1)
            self.assertIsInstance(gr.gr_context, Context)
            self.assertEqual(ID_VAR.get(), 1)
            self.assertEqual(gr.gr_context[ID_VAR], 1)

            # Clearing the context makes it get re-created as another
            # empty context when next used
            old_context = gr.gr_context
            gr.gr_context = None  # assign None while running
            self.assertIsNone(ID_VAR.get())
            self.assertIsNone(gr.gr_context)
            ID_VAR.set(2)
            self.assertIsInstance(gr.gr_context, Context)
            self.assertEqual(ID_VAR.get(), 2)
            self.assertEqual(gr.gr_context[ID_VAR], 2)

            new_context = gr.gr_context
            getcurrent().parent.switch((old_context, new_context))
            # parent switches us back to old_context

            self.assertEqual(ID_VAR.get(), 1)
            gr.gr_context = new_context  # assign non-None while running
            self.assertEqual(ID_VAR.get(), 2)

            getcurrent().parent.switch()
            # parent switches us back to no context
            self.assertIsNone(ID_VAR.get())
            self.assertIsNone(gr.gr_context)
            gr.gr_context = old_context
            self.assertEqual(ID_VAR.get(), 1)

            getcurrent().parent.switch()
            # parent switches us back to no context
            self.assertIsNone(ID_VAR.get())
            self.assertIsNone(gr.gr_context)

        gr = greenlet(target)

        with self.assertRaisesRegex(AttributeError, "can't delete context attribute"):
            del gr.gr_context

        self.assertIsNone(gr.gr_context)
        old_context, new_context = gr.switch()
        self.assertIs(new_context, gr.gr_context)
        self.assertEqual(old_context[ID_VAR], 1)
        self.assertEqual(new_context[ID_VAR], 2)
        self.assertEqual(new_context.run(ID_VAR.get), 2)
        gr.gr_context = old_context  # assign non-None while suspended
        gr.switch()
        self.assertIs(gr.gr_context, new_context)
        gr.gr_context = None  # assign None while suspended
        gr.switch()
        self.assertIs(gr.gr_context, old_context)
        gr.gr_context = None
        gr.switch()
        self.assertIsNone(gr.gr_context)

        # Make sure there are no reference leaks
        gr = None
        gc.collect()
        self.assertEqual(sys.getrefcount(old_context), 2)
        self.assertEqual(sys.getrefcount(new_context), 2)

    def test_context_assignment_different_thread(self):
        import threading
        VAR_VAR.set(None)
        ctx = Context()

        is_running = threading.Event()
        should_suspend = threading.Event()
        did_suspend = threading.Event()
        should_exit = threading.Event()
        holder = []

        def greenlet_in_thread_fn():
            VAR_VAR.set(1)
            is_running.set()
            should_suspend.wait(10)
            VAR_VAR.set(2)
            getcurrent().parent.switch()
            holder.append(VAR_VAR.get())

        def thread_fn():
            gr = greenlet(greenlet_in_thread_fn)
            gr.gr_context = ctx
            holder.append(gr)
            gr.switch()
            did_suspend.set()
            should_exit.wait(10)
            gr.switch()
            del gr
            greenlet() # trigger cleanup

        thread = threading.Thread(target=thread_fn, daemon=True)
        thread.start()
        is_running.wait(10)
        gr = holder[0]

        # Can't access or modify context if the greenlet is running
        # in a different thread
        with self.assertRaisesRegex(ValueError, "running in a different"):
            getattr(gr, 'gr_context')
        with self.assertRaisesRegex(ValueError, "running in a different"):
            gr.gr_context = None

        should_suspend.set()
        did_suspend.wait(10)

        # OK to access and modify context if greenlet is suspended
        self.assertIs(gr.gr_context, ctx)
        self.assertEqual(gr.gr_context[VAR_VAR], 2)
        gr.gr_context = None

        should_exit.set()
        thread.join(10)

        self.assertEqual(holder, [gr, None])

        # Context can still be accessed/modified when greenlet is dead:
        self.assertIsNone(gr.gr_context)
        gr.gr_context = ctx
        self.assertIs(gr.gr_context, ctx)

        # Otherwise we leak greenlets on some platforms.
        # XXX: Should be able to do this automatically
        del holder[:]
        gr = None
        thread = None

    def test_context_assignment_wrong_type(self):
        g = greenlet()
        with self.assertRaisesRegex(TypeError,
                                    "greenlet context must be a contextvars.Context or None"):
            g.gr_context = self


@skipIf(Context is not None, "ContextVar supported")
class NoContextVarsTests(TestCase):
    def test_contextvars_errors(self):
        let1 = greenlet(getcurrent().switch)
        self.assertFalse(hasattr(let1, 'gr_context'))
        with self.assertRaises(AttributeError):
            getattr(let1, 'gr_context')

        with self.assertRaises(AttributeError):
            let1.gr_context = None

        let1.switch()

        with self.assertRaises(AttributeError):
            getattr(let1, 'gr_context')

        with self.assertRaises(AttributeError):
            let1.gr_context = None

        del let1


if __name__ == '__main__':
    unittest.main()