diff --git a/Doc/library/stackless/pickling.rst b/Doc/library/stackless/pickling.rst index 9cab8e970f1a36..4216493a5e9f4c 100644 --- a/Doc/library/stackless/pickling.rst +++ b/Doc/library/stackless/pickling.rst @@ -108,6 +108,25 @@ different address than *t1*, which was displayed earlier. objects and frame objects contain code objects. And code objects are usually incompatible between different minor versions of |CPY|. +.. note:: + + If you pickle a tasklet, its :class:`~contextvars.Context` won't be pickled, + because :class:`~contextvars.Context` objects can't be pickled. See + :pep:`567` for an explanation. + + It is sometimes possible enable pickling of :class:`~contextvars.Context` objects + in an application specific way (see for instance: :func:`copyreg.pickle` or + :attr:`pickle.Pickler.dispatch_table` or :attr:`pickle.Pickler.persistent_id`). + Such an application can set the pickle flag + :const:`~stackless.PICKLEFLAGS_PICKLE_CONTEXT` to include the + context in the pickled state of a tasklet. + + Another option is to subclass :class:`tasklet` and overload the methods + :meth:`tasklet.__reduce_ex__` and :meth:`tasklet.__setstate__` to + pickle the values of particular :class:`~contextvars.ContextVar` objects together + with the tasklet. + + ====================== Pickling other objects ====================== diff --git a/Doc/library/stackless/stackless.rst b/Doc/library/stackless/stackless.rst index 5ed2a97160c029..9ef12b1f343574 100644 --- a/Doc/library/stackless/stackless.rst +++ b/Doc/library/stackless/stackless.rst @@ -51,6 +51,21 @@ Constants These constants have been added on a provisional basis (see :pep:`411` for details.) +.. data:: PICKLEFLAGS_PICKLE_CONTEXT + + This constant defines an option flag for the function + :func:`pickle_flags`. + + If this flag is set, |SLP| assumes that a :class:`~contextvars.Context` object + is pickleable. As a consequence the state information returned by :meth:`tasklet.__reduce_ex__` + includes the context of the tasklet. + + .. versionadded:: 3.7.6 + + .. note:: + This constant has been added on a provisional basis (see :pep:`411` + for details.) + --------- Functions --------- diff --git a/Doc/library/stackless/tasklets.rst b/Doc/library/stackless/tasklets.rst index 1f702c2fd4d9bd..1e1ca32a7469bf 100644 --- a/Doc/library/stackless/tasklets.rst +++ b/Doc/library/stackless/tasklets.rst @@ -110,6 +110,13 @@ The ``tasklet`` class :meth:`tasklet.setup`. The difference is that when providing them to :meth:`tasklet.bind`, the tasklet is not made runnable yet. + .. versionadded:: 3.7.6 + + If *func* is not :data:`None`, this method also sets the + :class:`~contextvars.Context` object of this tasklet to the + :class:`~contextvars.Context` object of the current tasklet. + Therefore it is usually not required to set the context explicitly. + *func* can be :data:`None` when providing arguments, in which case a previous call to :meth:`tasklet.bind` must have provided the function. @@ -344,6 +351,61 @@ The ``tasklet`` class # Implement unsafe logic here. t.set_ignore_nesting(old_value) +.. method:: tasklet.set_context(context) + + .. versionadded:: 3.7.6 + + Set the :class:`~contextvars.Context` object to be used while this tasklet runs. + + Every tasklet has a private context attribute. + When the tasklet runs, this context becomes the current context of the thread. + + :param context: the context to be set + :type context: :class:`contextvars.Context` + :return: the tasklet itself + :rtype: :class:`tasklet` + :raises RuntimeError: if the tasklet is bound to a foreign thread and is current or scheduled. + :raises RuntimeError: if called from within :meth:`contextvars.Context.run`. + + .. note:: + + The methods :meth:`__init__`, :meth:`bind` and :meth:`__setstate__` also set the context + of the tasklet they are called on to the context of the current tasklet. Therefore it is + usually not required to set the context explicitly. + + .. note:: + This method has been added on a provisional basis (see :pep:`411` + for details.) + +.. method:: tasklet.context_run(callable, \*args, \*\*kwargs) + + .. versionadded:: 3.7.6 + + Execute ``callable(*args, **kwargs)`` in the context object of the tasklet + the contest_run method is called on. Return the result of the + execution or propagate an exception if one occurred. + This method is roughly equivalent following pseudo code:: + + def context_run(self, callable, *args, **kwargs): + saved_context = stackless.current._internal_get_context() + stackless.current.set_context(self._internal_get_context()) + try: + return callable(*args, **kw) + finally: + stackless.current.set_context(saved_context) + + See also :meth:`contextvars.Context.run` for additional information. + Use this method with care, because it lets you manipulate the context of + another tasklet. Often it is sufficient to use a copy of the context + instead of the original object:: + + copied_context = tasklet.context_run(contextvars.copy_context) + copied_context.run(...) + + .. note:: + This method has been added on a provisional basis (see :pep:`411` + for details.) + .. method:: tasklet.__del__() .. versionadded:: 3.7 @@ -365,6 +427,13 @@ The ``tasklet`` class See :meth:`object.__setstate__`. + .. versionadded:: 3.7.6 + + If the tasklet becomes alive through this call and if *state* does not contain + a :class:`~contextvars.Context` object, then :meth:`~__setstate__` also sets + the :class:`~contextvars.Context` object of the + tasklet to the :class:`~contextvars.Context` object of the current tasklet. + :param state: the state as given by ``__reduce_ex__(...)[2]`` :type state: :class:`tuple` :return: self @@ -423,6 +492,18 @@ The following attributes allow checking of user set situations: This attribute is ``True`` while this tasklet is within a :meth:`tasklet.set_ignore_nesting` block +.. attribute:: tasklet.context_id + + .. versionadded:: 3.7.6 + + This attribute is the :func:`id` of the :class:`~contextvars.Context` object to be used while this tasklet runs. + It is intended mostly for debugging. + + .. note:: + This attribute has been added on a provisional basis (see :pep:`411` + for details.) + + The following attributes allow identification of tasklet place: .. attribute:: tasklet.is_current @@ -511,3 +592,72 @@ state transitions these functions are roughly equivalent to the following def schedule_remove(): stackless.current.next.switch() + +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Tasklets and Context Variables +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. versionadded:: 3.7.6 + +Version 3.7 of the |PPL| adds context variables, see module :mod:`contextvars`. +Usually they are used in connection with +:mod:`asyncio`, but they are a useful concept for |SLP| too. +Using context variables and multiple tasklets together didn't work well in |SLP| versions 3.7.0 to +3.7.5, because all tasklets of a given thread shared the same context. + +Starting with version 3.7.6 |SLP| adds explicit support for context variables. +Design requirements were: + +1. Be fully compatible with |CPY| and its design decisions. +2. Be fully compatible with previous applications of |SLP|, which are unaware of context variables. +3. Automatically share a context between related tasklets. This way a tasklet, that needs to set + a context variable, can delegate this duty to a sub-tasklet without the need to manage the + context of the sub-tasklet manually. +4. Enable the integration of tasklet-based co-routines into the :mod:`asyncio` framework. + This is an obvious application which involves context variables and tasklets. See + `slp-coroutine `_ for an example. + +Now each tasklet object has it own private context attribute. The design goals have some consequences: + +* The active :class:`~contextvars.Context` object of a thread (as defined by the |PPL|) + is the context of the :attr:`~stackless.current` tasklet. This implies that a tasklet switch, + switches the active context of the thread. + +* In accordance with the design decisions made in :pep:`567` the context of a tasklet can't be + accessed directly [#f1]_, but you can use the method :meth:`tasklet.context_run` to run arbitrary code + in this context. For instance ``tasklet.context_run(contextvars.copy_context())`` returns a copy + of the context. + The attribute :attr:`tasklet.context_id` can be used to test, if two tasklets share the context. + +* If you use the C-API, the context attribute of a tasklet is stored in the field *context* of the structure + :c:type:`PyTaskletObject` or :c:type:`PyThreadState`. This field is is either undefined (``NULL``) or a pointer to a + :class:`~contextvars.Context` object. + A tasklet, whose *context* is ``NULL`` **must** behave identically to a tasklet, whose context is an + empty :class:`~contextvars.Context` object [#f2]_. Therefore the |PY| API provides no way to distinguish + both states. Whenever the context of a tasklet is to be shared with another tasklet and `tasklet->context` + is initially `NULL`, it must be set to a newly created :class:`~contextvars.Context` object beforehand. + This affects the methods :meth:`~tasklet.context_run`, :meth:`~tasklet.__init__`, :meth:`~tasklet.bind` + and :meth:`~tasklet.__setstate__` as well as the attribute :attr:`tasklet.context_id`. + +* If the state of a tasklet changes from *not alive* to *bound* or to *alive* (methods :meth:`~tasklet.__init__`, + :meth:`~tasklet.bind` or :meth:`~tasklet.__setstate__`), the context + of the tasklet is set to the currently active context. This way a newly initialized tasklet automatically + shares the context of its creator. + +* The :mod:`contextvars` implementation of |CPY| imposes several restrictions on |SLP|. Especially the sanity checks in + :c:func:`PyContext_Enter` and :c:func:`PyContext_Exit` make it impossible to replace the current context within + the execution of the method :meth:`contextvars.Context.run`. In that case |SLP| raises :exc:`RuntimeError`. + +.. note:: + Context support has been added on a provisional basis (see :pep:`411` for details.) + +.. rubric:: Footnotes + +.. [#f1] Not exactly true. The return value of :meth:`tasklet.__reduce_ex__` can contain references to class + :class:`contextvars.Context`, but it is strongly discouraged, to use them for any other purpose + than pickling. + +.. [#f2] Setting a context variable to a non default value changes the value of the field *context* from ``NULL`` + to a pointer to a newly created :class:`~contextvars.Context` object. This can happen anytime in a + library call. Therefore any difference between an undefined context and an empty context causes ill defined + behavior. diff --git a/Include/internal/slp_prickelpit.h b/Include/internal/slp_prickelpit.h index f98665b1c5f505..3cef7e64b9591b 100644 --- a/Include/internal/slp_prickelpit.h +++ b/Include/internal/slp_prickelpit.h @@ -49,7 +49,8 @@ Py_ssize_t slp_from_tuple_with_nulls(PyObject **start, PyObject *tup); #define SLP_PICKLEFLAGS_PRESERVE_TRACING_STATE (1U) #define SLP_PICKLEFLAGS_PRESERVE_AG_FINALIZER (1U<<1) #define SLP_PICKLEFLAGS_RESET_AG_FINALIZER (1U<<2) -#define SLP_PICKLEFLAGS__MAX_VALUE ((1<<3)-1) /* must be a signed value */ +#define SLP_PICKLEFLAGS_PICKLE_CONTEXT (1U<<3) +#define SLP_PICKLEFLAGS__MAX_VALUE ((1<<4)-1) /* must be a signed value */ /* helper functions for module dicts */ diff --git a/Include/internal/stackless_impl.h b/Include/internal/stackless_impl.h index 437b3b58efb527..89ffd67cfb13f5 100644 --- a/Include/internal/stackless_impl.h +++ b/Include/internal/stackless_impl.h @@ -761,6 +761,11 @@ PyTaskletTStateStruc * slp_get_saved_tstate(PyTaskletObject *task); PyObject * slp_channel_seq_callback(struct _frame *f, int throwflag, PyObject *retval); PyObject * slp_get_channel_callback(void); +/* + * contextvars related prototypes + */ +PyObject* slp_context_run_callback(PyFrameObject *f, int exc, PyObject *result); + /* macro for use when interrupting tasklets from watchdog */ #define TASKLET_NESTING_OK(task) \ (ts->st.nesting_level == 0 || \ diff --git a/Include/slp_structs.h b/Include/slp_structs.h index 6d26204af3a9c0..21ab38ac073def 100644 --- a/Include/slp_structs.h +++ b/Include/slp_structs.h @@ -134,6 +134,9 @@ typedef struct _tasklet { int recursion_depth; PyObject *def_globals; PyObject *tsk_weakreflist; + /* If the tasklet is current: NULL. (The context of a current tasklet is always in ts->tasklet.) + * If the tasklet is not current: the context for the tasklet */ + PyObject *context; } PyTaskletObject; diff --git a/Lib/stackless.py b/Lib/stackless.py index a7dd0139780dd1..f7f7de49464fec 100644 --- a/Lib/stackless.py +++ b/Lib/stackless.py @@ -25,6 +25,7 @@ def __reduce_ex__(*args): PICKLEFLAGS_PRESERVE_TRACING_STATE = 1 PICKLEFLAGS_PRESERVE_AG_FINALIZER = 2 PICKLEFLAGS_RESET_AG_FINALIZER = 4 +PICKLEFLAGS_PICKLE_CONTEXT = 8 # Backwards support for unpickling older pickles, even from 2.7 from _stackless import _wrap @@ -73,6 +74,37 @@ def __iter__(self): # expressions like "stackless.current" as well defined. current = runcount = main = debug = uncollectables = threads = pickle_with_tracing_state = None +def _tasklet_get_unpicklable_state(tasklet): + """Get a dict with additional state, that can't be pickled + + The method tasklet.__reduce_ex__() returns the picklable state and this + function returns a tuple containing the rest. + + The items in the return value are: + 'context': the context of the tasklet + + Additional items may be added later. + + Note: this function has been added on a provisional basis (see :pep:`411` for details.) + """ + if not isinstance(tasklet, _stackless.tasklet): + raise TypeError("Argument must be a tasklet") + + with atomic(): + if tasklet.is_current or tasklet.thread_id != _stackless.current.thread_id: + # - A current tasklet can't be reduced. + # - We can't set pickle_flags for a foreign thread + # To mitigate these problems, we copy the context to a new tasklet + # (implicit copy by tasklet.__init__(callable, ...)) and reduce the new + # context instead + tasklet = tasklet.context_run(_stackless.tasklet, id) # "id" is just an arbitrary callable + + flags = pickle_flags(PICKLEFLAGS_PICKLE_CONTEXT, PICKLEFLAGS_PICKLE_CONTEXT) + try: + return {'context': tasklet.__reduce__()[2][8]} + finally: + pickle_flags(flags, PICKLEFLAGS_PICKLE_CONTEXT) + def transmogrify(): """ this function creates a subclass of the ModuleType with properties. diff --git a/Python/context.c b/Python/context.c index 54986c322e3bf2..8eb874348bc725 100644 --- a/Python/context.c +++ b/Python/context.c @@ -604,23 +604,32 @@ _contextvars_Context_copy_impl(PyContext *self) #ifdef STACKLESS -static PyObject* context_run_callback(PyFrameObject *f, int exc, PyObject *result) +PyObject* slp_context_run_callback(PyFrameObject *f, int exc, PyObject *result) { PyCFrameObject *cf = (PyCFrameObject *)f; - assert(PyContext_CheckExact(cf->ob1)); PyObject *context = cf->ob1; cf->ob1 = NULL; - if (PyContext_Exit(context)) { - Py_CLEAR(result); + if (cf->i) { + /* called by tasklet.context_run(...) */ + PyThreadState *ts = PyThreadState_GET(); + assert(ts); + assert(NULL == context || PyContext_CheckExact(context)); + Py_XSETREF(ts->context, context); + ts->context_ver++; + } else { + assert(PyContext_CheckExact(context)); + if (PyContext_Exit(context)) { + Py_CLEAR(result); + } + Py_DECREF(context); } - Py_DECREF(context); SLP_STORE_NEXT_FRAME(PyThreadState_GET(), cf->f_back); return result; } -SLP_DEF_INVALID_EXEC(context_run_callback) +SLP_DEF_INVALID_EXEC(slp_context_run_callback) #endif @@ -629,6 +638,7 @@ context_run(PyContext *self, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) { STACKLESS_GETARG(); + assert(NULL != self); if (nargs < 1) { PyErr_SetString(PyExc_TypeError, @@ -644,11 +654,12 @@ context_run(PyContext *self, PyObject *const *args, PyThreadState *ts = PyThreadState_GET(); PyCFrameObject *f = NULL; if (stackless) { - f = slp_cframe_new(context_run_callback, 1); + f = slp_cframe_new(slp_context_run_callback, 1); if (f == NULL) return NULL; Py_INCREF(self); f->ob1 = (PyObject *)self; + assert(f->i == 0); SLP_SET_CURRENT_FRAME(ts, (PyFrameObject *)f); /* f contains the only counted reference to current frame. This reference * keeps the fame alive during the following _PyObject_FastCallKeywords(). @@ -1326,7 +1337,7 @@ _PyContext_Init(void) #ifdef STACKLESS if (slp_register_execute(&PyCFrame_Type, "context_run_callback", - context_run_callback, SLP_REF_INVALID_EXEC(context_run_callback)) != 0) + slp_context_run_callback, SLP_REF_INVALID_EXEC(slp_context_run_callback)) != 0) { return 0; } diff --git a/Stackless/changelog.txt b/Stackless/changelog.txt index 2695f4293056c3..b2bf92a565c6d3 100644 --- a/Stackless/changelog.txt +++ b/Stackless/changelog.txt @@ -9,6 +9,16 @@ What's New in Stackless 3.X.X? *Release date: 20XX-XX-XX* +- https://github.com/stackless-dev/stackless/issues/239 + Add support for PEP 567 context variables to tasklets. + Each tasklet now has a contextvars.Context object, that becomes active during + the execution of the tasklet. + New tasklet methods "set_context()" and "context_run()". + New read only attribute "tasklet.context_id". + New constant "stackless.PICKLEFLAGS_PICKLE_CONTEXT". + And an intentionally undocumented function + "stackless._tasklet_get_unpicklable_state()" + - https://github.com/stackless-dev/stackless/issues/245 Prevent a crash, if you call tasklet.__init__() or tasklet.bind() with wrong argument types. diff --git a/Stackless/core/cframeobject.c b/Stackless/core/cframeobject.c index 913dd1b472af21..3591f176a27b11 100644 --- a/Stackless/core/cframeobject.c +++ b/Stackless/core/cframeobject.c @@ -30,6 +30,7 @@ #ifdef STACKLESS #include "internal/stackless_impl.h" #include "internal/slp_prickelpit.h" +#include "internal/context.h" static PyCFrameObject *free_list = NULL; static int numfree = 0; /* number of cframes currently in free_list */ @@ -136,6 +137,8 @@ cframe_reduce(PyCFrameObject *cf) PyObject *res = NULL, *exec_name = NULL; PyObject *params = NULL; int valid = 1; + PyObject *obs[3]; + long i, n; if (cf->f_execute == execute_soft_switchable_func) { exec_name = (PyObject *) cf->any2; @@ -146,7 +149,24 @@ cframe_reduce(PyCFrameObject *cf) } else if ((exec_name = slp_find_execname((PyFrameObject *) cf, &valid)) == NULL) return NULL; - params = slp_into_tuple_with_nulls(&cf->ob1, 3); + obs[0] = cf->ob1; + obs[1] = cf->ob2; + obs[2] = cf->ob3; + i = cf->i; + n = cf->n; + + if (cf->f_execute == slp_context_run_callback && 0 == i) { + /* + * Replace a logical PyContext_Exit(context) with the equivalent + * stackless.current.set_context(). + */ + assert(PyContext_CheckExact(obs[0])); + assert(((PyContext *) obs[0])->ctx_entered); + obs[0] = (PyObject *)((PyContext *) obs[0])->ctx_prev; + i = 1; + } + + params = slp_into_tuple_with_nulls(obs, 3); if (params == NULL) goto err_exit; res = Py_BuildValue ("(O()(" cframetuplefmt "))", @@ -154,8 +174,8 @@ cframe_reduce(PyCFrameObject *cf) valid, exec_name, params, - cf->i, - cf->n); + i, + n); err_exit: Py_XDECREF(exec_name); diff --git a/Stackless/module/clinic/taskletobject.c.h b/Stackless/module/clinic/taskletobject.c.h new file mode 100644 index 00000000000000..b96b253fff3e79 --- /dev/null +++ b/Stackless/module/clinic/taskletobject.c.h @@ -0,0 +1,49 @@ +/*[clinic input] +preserve +[clinic start generated code]*/ + +#if defined(STACKLESS) + +PyDoc_STRVAR(_stackless_tasklet_set_context__doc__, +"set_context($self, /, context)\n" +"--\n" +"\n" +"Set the context to be used while this tasklet runs.\n" +"\n" +"Every tasklet has a private context attribute. When the tasklet runs,\n" +"this context becomes the current context of the thread.\n" +"\n" +"This method raises RuntimeError, if the tasklet is bound to a foreign thread and is current or scheduled.\n" +"This method raises RuntimeError, if called from within Context.run().\n" +"This method returns the tasklet it is called on."); + +#define _STACKLESS_TASKLET_SET_CONTEXT_METHODDEF \ + {"set_context", (PyCFunction)_stackless_tasklet_set_context, METH_FASTCALL|METH_KEYWORDS, _stackless_tasklet_set_context__doc__}, + +static PyObject * +_stackless_tasklet_set_context_impl(PyTaskletObject *self, PyObject *context); + +static PyObject * +_stackless_tasklet_set_context(PyTaskletObject *self, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames) +{ + PyObject *return_value = NULL; + static const char * const _keywords[] = {"context", NULL}; + static _PyArg_Parser _parser = {"O!:set_context", _keywords, 0}; + PyObject *context; + + if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser, + &PyContext_Type, &context)) { + goto exit; + } + return_value = _stackless_tasklet_set_context_impl(self, context); + +exit: + return return_value; +} + +#endif /* defined(STACKLESS) */ + +#ifndef _STACKLESS_TASKLET_SET_CONTEXT_METHODDEF + #define _STACKLESS_TASKLET_SET_CONTEXT_METHODDEF +#endif /* !defined(_STACKLESS_TASKLET_SET_CONTEXT_METHODDEF) */ +/*[clinic end generated code: output=f45e93a186a75d8b input=a9049054013a1b77]*/ diff --git a/Stackless/module/scheduling.c b/Stackless/module/scheduling.c index 913a0a71a44911..554950d93c93de 100644 --- a/Stackless/module/scheduling.c +++ b/Stackless/module/scheduling.c @@ -710,8 +710,10 @@ new_lock(void) * whereas Stackless stores the exception state in the tasklet object. * When switching from one tasklet to another tasklet, we have to switch * the exc_info-pointer in the thread state. + * + * With current compilers, an inline function performs no worse than a macro, + * but in the debugger single stepping it is much simpler. */ - #if 1 Py_LOCAL_INLINE(void) SLP_EXCHANGE_EXCINFO(PyThreadState *tstate, PyTaskletObject *task) { @@ -724,8 +726,7 @@ Py_LOCAL_INLINE(void) SLP_EXCHANGE_EXCINFO(PyThreadState *tstate, PyTaskletObjec assert(exc_info); assert(t_->exc_info); #if 0 - PyObject *c; - c = PyStackless_GetCurrent(); + PyObject *c = PyStackless_GetCurrent(); fprintf(stderr, "SLP_EXCHANGE_EXCINFO %3d current %14p,\tset task %p = %p,\ttstate %p = %p\n", __LINE__, c, t_, exc_info, ts_, t_->exc_info); Py_XDECREF(c); #endif @@ -748,18 +749,34 @@ Py_LOCAL_INLINE(void) SLP_EXCHANGE_EXCINFO(PyThreadState *tstate, PyTaskletObjec } while(0) #endif +/* + * The inline function (or macro) SLP_UPDATE_TSTATE_ON_SWITCH encapsulates some changes + * to the thread state when Stackless switches tasklets: + * - Exchange the exception information + * - Switch the PEP 567 context + */ #if 1 Py_LOCAL_INLINE(void) SLP_UPDATE_TSTATE_ON_SWITCH(PyThreadState *tstate, PyTaskletObject *prev, PyTaskletObject *next) { SLP_EXCHANGE_EXCINFO(tstate, prev); SLP_EXCHANGE_EXCINFO(tstate, next); + prev->context = tstate->context; + tstate->context = next->context; + tstate->context_ver++; + next->context = NULL; } #else #define SLP_UPDATE_TSTATE_ON_SWITCH(tstate__, prev_, next_) \ do { \ PyThreadState *ts__ = (tstate__); \ - SLP_EXCHANGE_EXCINFO(ts__, (prev_)); \ - SLP_EXCHANGE_EXCINFO(ts__, (next_)); \ + PyTaskletObject *prev__ = (prev_); \ + PyTaskletObject *next__ = (next_); \ + SLP_EXCHANGE_EXCINFO(ts__, prev__); \ + SLP_EXCHANGE_EXCINFO(ts__, next__); \ + prev__->context = ts__->context; \ + ts__->context = next__->context; \ + ts__->context_ver++; \ + next__->context = NULL; \ } while(0) #endif @@ -1285,6 +1302,7 @@ slp_initialize_main_and_current(void) assert(task->exc_state.exc_traceback == NULL); assert(task->exc_state.previous_item == NULL); assert(task->exc_info == &task->exc_state); + assert(task->context == NULL); SLP_EXCHANGE_EXCINFO(ts, task); NOTIFY_SCHEDULE(ts, NULL, task, -1); @@ -1378,6 +1396,7 @@ schedule_task_destruct(PyObject **retval, PyTaskletObject *prev, PyTaskletObject /* main is exiting */ assert(ts->st.main == NULL); assert(ts->exc_info == &prev->exc_state); + assert(prev->context == NULL); SLP_EXCHANGE_EXCINFO(ts, prev); TASKLET_CLAIMVAL(prev, retval); if (PyBomb_Check(*retval)) diff --git a/Stackless/module/taskletobject.c b/Stackless/module/taskletobject.c index 102c4091dd0df5..088293582d698a 100644 --- a/Stackless/module/taskletobject.c +++ b/Stackless/module/taskletobject.c @@ -9,6 +9,15 @@ #ifdef STACKLESS #include "internal/stackless_impl.h" +#include "internal/context.h" + +/*[clinic input] +module _stackless +class _stackless.tasklet "PyTaskletObject *" "&PyTasklet_Type" +[clinic start generated code]*/ +/*[clinic end generated code: output=da39a3ee5e6b4b0d input=81570dcf604e6e6d]*/ +#include "clinic/taskletobject.c.h" + /* * Convert C-bitfield @@ -162,6 +171,7 @@ tasklet_traverse(PyTaskletObject *t, visitproc visit, void *arg) Py_VISIT(t->exc_state.exc_type); Py_VISIT(t->exc_state.exc_value); Py_VISIT(t->exc_state.exc_traceback); + Py_VISIT(t->context); return 0; } @@ -193,6 +203,7 @@ tasklet_clear(PyTaskletObject *t) tasklet_clear_frames(t); Py_CLEAR(t->tempval); Py_CLEAR(t->def_globals); + Py_CLEAR(t->context); /* unlink task from cstate */ if (t->cstate != NULL && t->cstate->task == t) @@ -328,6 +339,66 @@ PyTasklet_New(PyTypeObject *type, PyObject *func) return (PyTaskletObject*)PyObject_CallFunction((PyObject*)type, NULL); } +Py_LOCAL_INLINE(PyObject *) +_get_tasklet_context(PyTaskletObject *self) +{ + PyThreadState *ts = self->cstate->tstate; + PyThreadState *cts = PyThreadState_Get(); + PyObject *ctx; + assert(cts); + + /* Get the context for the tasklet *self. + * If the tasklet has no context, set a new empty one. + */ + if (ts && self == ts->st.current) { + /* the tasklet *self is current */ + ctx = ts->context; + if (NULL == ctx) { + if (ts == cts) { + /* *self belongs to the current thread. Call a C-API function, that + * initializes ts->context as a side effect */ + ctx = PyContext_CopyCurrent(); + if (NULL == ctx) + return NULL; + Py_DECREF(ctx); + ctx = ts->context; + assert(NULL != ctx); + } else { + slp_runtime_error("The tasklet has no context and you can't set one from a foreign thread."); + } + } + } else { + /* the tasklet *self is not current */ + ctx = self->context; + if (NULL == ctx) { + ctx = PyContext_New(); + if (NULL == ctx) + return NULL; + self->context = ctx; + } + } + Py_INCREF(ctx); + return ctx; +} + +Py_LOCAL_INLINE(int) +_tasklet_init_context(PyTaskletObject *task) +{ + PyThreadState *cts = PyThreadState_Get(); + assert(cts); + + PyObject *ctx = _get_tasklet_context(cts->st.current); + if (NULL == ctx) + return -1; + + PyObject *obj = _stackless_tasklet_set_context_impl(task, ctx); + Py_DECREF(ctx); + if (NULL == obj) + return -1; + Py_DECREF(obj); + return 0; +} + static int impl_tasklet_setup(PyTaskletObject *task, PyObject *args, PyObject *kwds, int insert); @@ -361,6 +432,13 @@ PyTasklet_BindEx(PyTaskletObject *task, PyObject *func, PyObject *args, PyObject RUNTIME_ERROR("can't unbind the main tasklet", -1); } + /* + * Set the context to the current context. It can be changed later on. + */ + if (func) + if (_tasklet_init_context(task)) + return -1; + tasklet_clear_frames(task); task->recursion_depth = 0; assert(task->flags.autoschedule == 0); /* probably unused */ @@ -479,6 +557,7 @@ tasklet_new(PyTypeObject *type, PyObject *args, PyObject *kwds) Py_INCREF(Py_None); t->tempval = Py_None; t->tsk_weakreflist = NULL; + t->context = NULL; Py_INCREF(ts->st.initial_stub); t->cstate = ts->st.initial_stub; t->def_globals = PyEval_GetGlobals(); @@ -536,6 +615,7 @@ tasklet_reduce(PyTaskletObject * t) PyFrameObject *f; PyThreadState *ts = t->cstate->tstate; PyObject *exc_type, *exc_value, *exc_traceback, *exc_info; + PyObject *context = NULL; if (ts && t == ts->st.current) RUNTIME_ERROR("You cannot __reduce__ the tasklet which is" @@ -568,6 +648,10 @@ tasklet_reduce(PyTaskletObject * t) goto err_exit; } + context = _get_tasklet_context(t); + if (NULL == context) + goto err_exit; + assert(!ts || t->exc_info != &ts->exc_state); /* Because of the test a few lines above, it is guaranteed that t is not the current tasklet. * Therefore we can simplify the line @@ -596,7 +680,8 @@ tasklet_reduce(PyTaskletObject * t) Py_INCREF(exc_type); Py_INCREF(exc_value); Py_INCREF(exc_traceback); - tup = Py_BuildValue("(O()(" TASKLET_TUPLEFMT "))", + tup = Py_BuildValue((ts && (ts->st.pickleflags & SLP_PICKLEFLAGS_PICKLE_CONTEXT)) ? + "(O()(" TASKLET_TUPLEFMT "O))" : "(O()(" TASKLET_TUPLEFMT "))", Py_TYPE(t), tasklet_flags_as_integer(t->flags), t->tempval, @@ -605,7 +690,8 @@ tasklet_reduce(PyTaskletObject * t) exc_type, exc_value, exc_traceback, - exc_info + exc_info, + context ); Py_DECREF(exc_info); Py_DECREF(exc_type); @@ -613,6 +699,7 @@ tasklet_reduce(PyTaskletObject * t) Py_DECREF(exc_traceback); err_exit: Py_XDECREF(lis); + Py_XDECREF(context); return tup; } @@ -634,6 +721,7 @@ tasklet_setstate(PyObject *self, PyObject *args) PyObject *exc_type, *exc_value, *exc_traceback; PyObject *old_type, *old_value, *old_traceback; PyObject *exc_info_obj; + PyObject *context = NULL; PyFrameObject *f; Py_ssize_t i, nframes; int j; @@ -643,7 +731,7 @@ tasklet_setstate(PyObject *self, PyObject *args) if (PyTasklet_Alive(t)) RUNTIME_ERROR("tasklet is alive", NULL); - if (!PyArg_ParseTuple(args, "iOiO!OOOO:tasklet", + if (!PyArg_ParseTuple(args, "iOiO!OOOO|O:tasklet", &flags, &tempval, &nesting_level, @@ -651,13 +739,19 @@ tasklet_setstate(PyObject *self, PyObject *args) &exc_type, &exc_value, &exc_traceback, - &exc_info_obj)) + &exc_info_obj, + &context)) return NULL; + if (Py_None == context) + context = NULL; + if (context != NULL && !PyContext_CheckExact(context)) + TYPE_ERROR("tasklet state[8] must be a contextvars.Context or None", NULL); + nframes = PyList_GET_SIZE(lis); TASKLET_SETVAL(t, tempval); - /* There is a unpickling race condition. While it is rare, + /* There is an unpickling race condition. While it is rare, * sometimes tasklets get their setstate call after the * channel they are blocked on. If this happens and we * do not account for it, they will be left in a broken @@ -702,7 +796,16 @@ tasklet_setstate(PyObject *self, PyObject *args) back = f; } t->f.frame = f; + if(NULL == context && _tasklet_init_context(t)) + return NULL; + } + if (context) { + PyObject *obj = _stackless_tasklet_set_context_impl(t, context); + if (NULL == obj) + return NULL; + Py_DECREF(obj); } + /* walk frames again and calculate recursion_depth */ for (f = t->f.frame; f != NULL; f = f->f_back) { if (PyFrame_Check(f) && f->f_execute != PyEval_EvalFrameEx_slp) { @@ -2201,6 +2304,143 @@ tasklet_set_profile_function(PyTaskletObject *task, PyObject *value) RUNTIME_ERROR("tasklet is not alive", -1); } +/*[clinic input] +_stackless.tasklet.set_context + + context: object(subclass_of='&PyContext_Type') + +Set the context to be used while this tasklet runs. + +Every tasklet has a private context attribute. When the tasklet runs, +this context becomes the current context of the thread. + +This method raises RuntimeError, if the tasklet is bound to a foreign thread and is current or scheduled. +This method raises RuntimeError, if called from within Context.run(). +This method returns the tasklet it is called on. +[clinic start generated code]*/ + +static PyObject * +_stackless_tasklet_set_context_impl(PyTaskletObject *self, PyObject *context) +/*[clinic end generated code: output=23061bb958da0ff9 input=3c29aedc0d51481c]*/ +{ + PyThreadState *ts = self->cstate->tstate; + PyThreadState *cts = PyThreadState_Get(); + PyObject *ctx; + + assert(context); + assert(PyContext_CheckExact(context)); + + if (ts && self == ts->st.current) { + /* the tasklet is the current tasklet. */ + + /* I'm not sure, if setting the context for a current tasklet is really relevant, + * but it can be implemented. Therefore I'm going to implement it. */ + if (ts != cts) + goto fail_other_thread; + + /* Its context is in ts->context */ + ctx = ts->context; + if (ctx && ((PyContext *) ctx)->ctx_entered) + goto fail_ctx_entered; + Py_INCREF(context); + Py_XSETREF(ts->context, context); + ts->context_ver++; + } else { + /* the tasklet is not the current tasklet. Its context is in self->context */ + if (ts != cts && PyTasklet_Scheduled(self) && !self->flags.blocked) + goto fail_other_thread; + ctx = self->context; + if (ctx && ((PyContext *) ctx)->ctx_entered) + goto fail_ctx_entered; + Py_INCREF(context); + Py_XSETREF(self->context, context); + } + Py_INCREF(self); + return (PyObject *) self; +fail_ctx_entered: + return slp_runtime_error("the current context of the tasklet has been entered."); +fail_other_thread: + return slp_runtime_error("tasklet belongs to a different thread"); +} + +/* AFAIK argument clinic currently does not support the signature of context_run(callable, *args, **kwargs). */ +PyDoc_STRVAR(tasklet_context_run__doc__,"context_run(callable, *args, **kwargs)\n\ +\n\ +Execute callable(*args, **kwargs) code in the context object of the tasklet the contest_run method is called on.\n\ +Return the result of the execution or propagate an exception if one occurred."); + +static PyObject * +tasklet_context_run(PyTaskletObject *self, PyObject *const *args, + Py_ssize_t nargs, PyObject *kwnames) +{ + STACKLESS_GETARG(); + PyThreadState *ts = self->cstate->tstate; + PyThreadState *cts = PyThreadState_Get(); + PyObject *ctx; + assert(cts); + + if (nargs < 1) { + PyErr_SetString(PyExc_TypeError, + "run() missing 1 required positional argument"); + return NULL; + } + + ctx = _get_tasklet_context(self); /* returns an new reference */ + + PyObject * saved_context = cts->context; + cts->context = ctx; + cts->context_ver++; + ctx = NULL; + + PyCFrameObject *f = NULL; + if (stackless) { + f = slp_cframe_new(slp_context_run_callback, 1); + if (f == NULL) { + Py_XSETREF(cts->context, saved_context); + return NULL; + } + f->i = 1; + Py_XINCREF(saved_context); + f->ob1 = saved_context; + SLP_SET_CURRENT_FRAME(ts, (PyFrameObject *)f); + /* f contains the only counted reference to current frame. This reference + * keeps the fame alive during the following _PyObject_FastCallKeywords(). + */ + } + STACKLESS_PROMOTE_ALL(); + PyObject *call_result = _PyObject_FastCallKeywords( + args[0], args + 1, nargs - 1, kwnames); + STACKLESS_ASSERT(); + + if (stackless && !STACKLESS_UNWINDING(call_result)) { + /* required, because we added a C-frame */ + assert(f); + assert((PyFrameObject *)f == SLP_CURRENT_FRAME(ts)); + SLP_STORE_NEXT_FRAME(ts, (PyFrameObject *)f); + Py_DECREF(f); + Py_XDECREF(saved_context); + return STACKLESS_PACK(ts, call_result); + } + Py_XDECREF(f); + if (STACKLESS_UNWINDING(call_result)) { + Py_XDECREF(saved_context); + return call_result; + } + Py_XSETREF(cts->context, saved_context); + cts->context_ver++; + return call_result; +} + +static PyObject * +tasklet_context_id(PyTaskletObject *self) +{ + PyObject *ctx = _get_tasklet_context(self); + PyObject *result = PyLong_FromVoidPtr(ctx); + Py_DECREF(ctx); + return result; +} + + static PyMemberDef tasklet_members[] = { {"cstate", T_OBJECT, offsetof(PyTaskletObject, cstate), READONLY, PyDoc_STR("the C stack object associated with the tasklet.\n\ @@ -2300,6 +2540,9 @@ static PyGetSetDef tasklet_getsetlist[] = { "For the current tasklet this property is equivalent to sys.gettrace()\n" "and sys.settrace().")}, + {"context_id", (getter)tasklet_context_id, NULL, + PyDoc_STR("The id of the context object of this tasklet.")}, + {0}, }; @@ -2339,6 +2582,9 @@ static PyMethodDef tasklet_methods[] = { tasklet_setstate__doc__}, {"bind_thread", (PCF)tasklet_bind_thread, METH_VARARGS, tasklet_bind_thread__doc__}, + {"context_run", (PCF)tasklet_context_run, METH_FASTCALL | METH_KEYWORDS | METH_STACKLESS, + tasklet_context_run__doc__}, + _STACKLESS_TASKLET_SET_CONTEXT_METHODDEF {NULL, NULL} /* sentinel */ }; diff --git a/Stackless/unittests/support.py b/Stackless/unittests/support.py index 4dc5aa977b3596..05851524f2c661 100644 --- a/Stackless/unittests/support.py +++ b/Stackless/unittests/support.py @@ -34,6 +34,7 @@ import gc import os import functools +import copyreg from test.support import run_unittest # emit warnings about uncollectable objects @@ -674,21 +675,65 @@ def tasklet_is_uncollectable(self, tlet): self.assertIsInstance(tlet, stackless.tasklet) self.__uncollectable_tasklets.append(id(tlet)) - def dumps(self, obj, protocol=None, *, fix_imports=True): - if protocol is None: - protocol = self._pickle_protocol + def dumps(self, obj, protocol=None, *, fix_imports=True, external_map=None, additional_dispatch_table=None): if self._pickle_module == "P": - return pickle._dumps(obj, protocol=protocol, fix_imports=fix_imports) + cls = pickle._Pickler elif self._pickle_module == "C": - return pickle.dumps(obj, protocol=protocol, fix_imports=fix_imports) - raise ValueError("Invalid pickle module") + cls = pickle.Pickler + else: + raise ValueError("Invalid pickle module") + + if external_map is not None: + inverted_external_map = {id(v): k for k,v in external_map.items()} + class PicklerWithExternalObjects(cls): + def persistent_id(self, obj): + if external_map: + return inverted_external_map.get(id(obj)) + return None + cls = PicklerWithExternalObjects - def loads(self, s, *, fix_imports=True, encoding="ASCII", errors="strict"): + if protocol is None: + protocol = self._pickle_protocol + + f = io.BytesIO() + p=cls(f, protocol, fix_imports=fix_imports) + if additional_dispatch_table is not None: + dispatch_table = copyreg.dispatch_table.copy() + dispatch_table.update(additional_dispatch_table) + p.dispatch_table = dispatch_table + p.dump(obj) + res = f.getvalue() + assert isinstance(res, (bytes, bytearray)) + return res + + def loads(self, s, *, fix_imports=True, encoding="ASCII", errors="strict", external_map=None): + if isinstance(s, str): + raise TypeError("Can't load pickle from unicode string") if self._pickle_module == "P": - return pickle._loads(s, fix_imports=fix_imports, encoding=encoding, errors=errors) + cls = pickle._Unpickler elif self._pickle_module == "C": - return pickle.loads(s, fix_imports=fix_imports, encoding=encoding, errors=errors) - raise ValueError("Invalid pickle module") + cls = pickle.Unpickler + else: + raise ValueError("Invalid pickle module") + + if external_map is not None: + class UnPicklerWithExternalObjects(cls): + def persistent_load(self, pid): + # This method is invoked whenever a persistent ID is encountered. + # Here, pid is the tuple returned by DBPickler. + if external_map is None: + raise pickle.UnpicklingError("external_map not set") + try: + return external_map[pid] + except KeyError: + # Always raises an error if you cannot return the correct object. + # Otherwise, the unpickler will think None is the object referenced + # by the persistent ID. + raise pickle.UnpicklingError("unsupported persistent object") + cls = UnPicklerWithExternalObjects + + file = io.BytesIO(s) + return cls(file, fix_imports=fix_imports, encoding=encoding, errors=errors).load() # limited pickling support for test cases # Between setUp() and tearDown() the test-case has a diff --git a/Stackless/unittests/test_miscell.py b/Stackless/unittests/test_miscell.py index 85dc9e8486d493..a59228373e2130 100644 --- a/Stackless/unittests/test_miscell.py +++ b/Stackless/unittests/test_miscell.py @@ -12,6 +12,7 @@ import os import struct import gc +import contextvars from _stackless import _test_nostacklesscall as apply_not_stackless import _teststackless @@ -1330,6 +1331,484 @@ def task(): t.kill() +class TestTaskletContext(AsTaskletTestCase): + cvar = contextvars.ContextVar('TestTaskletContext', default='unset') + + def test_contexct_id(self): + ct = stackless.current + + # prepare a context + sentinel = object() + self.cvar.set(sentinel) # make sure, a context is active + self.assertIsInstance(ct.context_id, int) + + # no context + t = stackless.tasklet() # new tasklet without context + self.assertEqual(t.context_run(self.cvar.get), "unset") + + # known context + ctx = contextvars.Context() + self.assertNotEqual(id(ctx), ct.context_id) + self.assertIs(ctx.run(stackless.getcurrent), ct) + cid = ctx.run(getattr, ct, "context_id") + self.assertEqual(id(ctx), cid) + + def test_set_context_same_thread_not_current(self): + # same thread, tasklet is not current + + # prepare a context + ctx = contextvars.Context() + sentinel = object() + ctx.run(self.cvar.set, sentinel) + + tasklet_started = False + t = stackless.tasklet() + def task(): + nonlocal tasklet_started + self.assertEqual(t.context_id, id(ctx)) + tasklet_started = True + + t.bind(task)() + + # set the context + t.set_context(ctx) + + # validate the context + stackless.run() + self.assertTrue(tasklet_started) + + def test_set_context_same_thread_not_current_entered(self): + # prepare a context + sentinel = object() + ctx = contextvars.Context() + ctx.run(self.cvar.set, sentinel) + tasklet_started = False + + t = stackless.tasklet() + def task(): + nonlocal tasklet_started + self.assertEqual(t.context_id, id(ctx)) + tasklet_started = True + stackless.schedule_remove() + self.assertEqual(t.context_id, id(ctx)) + + t.bind(ctx.run)(task) + stackless.run() + self.assertTrue(tasklet_started) + self.assertTrue(t.alive) + self.assertIsNot(t, stackless.current) + self.assertRaisesRegex(RuntimeError, "the current context of the tasklet has been entered", + t.set_context, contextvars.Context()) + t.insert() + stackless.run() + + def test_set_context_same_thread_current(self): + # same thread, current tasklet + + # make sure a context exists + sentinel = object() + self.cvar.set(sentinel) + self.assertIs(self.cvar.get(), sentinel) + self.assertIsInstance(stackless.current.context_id, int) + + # prepare another context + sentinel2 = object() + ctx = contextvars.Context() + ctx.run(self.cvar.set, sentinel2) + self.assertNotEqual(stackless.current.context_id, id(ctx)) + + # change the context of the current tasklet + stackless.current.set_context(ctx) + + # check that the new context is the context of the current tasklet + self.assertEqual(stackless.current.context_id, id(ctx)) + self.assertIs(self.cvar.get(), sentinel2) + + def test_set_context_same_thread_current_entered(self): + # entered current context on the same thread + sentinel = object() + self.cvar.set(sentinel) + cid = stackless.current.context_id + self.assertRaisesRegex(RuntimeError, "the current context of the tasklet has been entered", + contextvars.Context().run, stackless.current.set_context, contextvars.Context()) + self.assertEqual(stackless.current.context_id, cid) + self.assertIs(self.cvar.get(), sentinel) + + def test_set_context_other_thread_paused(self): + # other thread, tasklet is not current + # prepare a context + ctx = contextvars.Context() + sentinel = object() + ctx.run(self.cvar.set, sentinel) + + tasklet_started = False + t = stackless.tasklet() + def task(): + nonlocal tasklet_started + self.assertEqual(t.context_id, id(ctx)) + tasklet_started = True + + t.bind(task, ()) # paused + + # set the context + thr = threading.Thread(target=t.set_context, args=(ctx,), name="other thread") + thr.start() + thr.join() + + # validate the context + t.insert() + stackless.run() + self.assertTrue(tasklet_started) + + def test_set_context_other_thread_scheduled(self): + # other thread, tasklet is not current + # prepare a context + ctx = contextvars.Context() + sentinel = object() + ctx.run(self.cvar.set, sentinel) + + tasklet_started = False + t = stackless.tasklet() + def task(): + nonlocal tasklet_started + self.assertEqual(t.context_id, id(ctx)) + tasklet_started = True + + t.bind(task)() # scheduled + t.set_context(ctx) + + # set the context + got_exception = None + def other_thread(): + nonlocal got_exception + try: + t.set_context(contextvars.Context()) + except RuntimeError as e: + got_exception = e + thr = threading.Thread(target=other_thread, name="other thread") + thr.start() + thr.join() + + # validate the result + self.assertIsInstance(got_exception, RuntimeError) + with self.assertRaisesRegex(RuntimeError, "tasklet belongs to a different thread"): + raise got_exception + + stackless.run() + self.assertTrue(tasklet_started) + + def test_set_context_other_thread_not_current_entered(self): + # prepare a context + sentinel = object() + ctx = contextvars.Context() + ctx.run(self.cvar.set, sentinel) + tasklet_started = False + + t = stackless.tasklet() + def task(): + nonlocal tasklet_started + self.assertEqual(t.context_id, id(ctx)) + tasklet_started = True + stackless.schedule_remove() + self.assertEqual(t.context_id, id(ctx)) + + t.bind(ctx.run)(task) + stackless.run() + self.assertTrue(tasklet_started) + self.assertTrue(t.alive) + self.assertIsNot(t, stackless.current) + + # set the context + got_exception = None + def other_thread(): + nonlocal got_exception + try: + t.set_context(contextvars.Context()) + except RuntimeError as e: + got_exception = e + thr = threading.Thread(target=other_thread, name="other thread") + thr.start() + thr.join() + + # validate the result + self.assertIsInstance(got_exception, RuntimeError) + with self.assertRaisesRegex(RuntimeError, "the current context of the tasklet has been entered"): + raise got_exception + t.insert() + stackless.run() + + def test_set_context_other_thread_current(self): + # other thread, current tasklet + + # make sure a context exists + sentinel = object() + self.cvar.set(sentinel) + self.assertIs(self.cvar.get(), sentinel) + cid = stackless.current.context_id + + t = stackless.current + # set the context + got_exception = None + def other_thread(): + nonlocal got_exception + try: + t.set_context(contextvars.Context()) + except RuntimeError as e: + got_exception = e + + thr = threading.Thread(target=other_thread, name="other thread") + thr.start() + thr.join() + + self.assertIsInstance(got_exception, RuntimeError) + with self.assertRaisesRegex(RuntimeError, "tasklet belongs to a different thread"): + raise got_exception + self.assertEqual(stackless.current.context_id, cid) + + def test_set_context_other_thread_current_entered(self): + # entered current context on other thread + + t = stackless.current + # set the context + got_exception = None + def other_thread(): + nonlocal got_exception + try: + t.set_context(contextvars.Context()) + except RuntimeError as e: + got_exception = e + + thr = threading.Thread(target=other_thread, name="other thread") + + # prepare a context + sentinel = object() + ctx = contextvars.Context() + ctx.run(self.cvar.set, sentinel) + + def in_ctx(): + self.assertEqual(stackless.current.context_id, id(ctx)) + thr.start() + thr.join() + self.assertEqual(stackless.current.context_id, id(ctx)) + + ctx.run(in_ctx) + self.assertIsInstance(got_exception, RuntimeError) + with self.assertRaisesRegex(RuntimeError, "tasklet belongs to a different thread"): + raise got_exception + + + def test_context_init_null_main(self): + # test the set_context in tasklet.bind, if the current context is NULL in main tasklet + + cid = None + t = stackless.tasklet() + + def other_thread(): + nonlocal cid + self.assertIs(stackless.current, stackless.main) + + # this creates a context, because all tasklets initialized by the main-tasklet + # shall share a common context + t.bind(lambda: None) + cid = stackless.current.context_id + + thr = threading.Thread(target=other_thread, name="other thread") + thr.start() + thr.join() + self.assertIsInstance(cid, int) + self.assertEqual(t.context_id, cid) + + def test_context_init_main(self): + # test the set_context in tasklet.bind, in main tasklet + + sentinel = object() + cid = None + t = stackless.tasklet() + + def other_thread(): + nonlocal cid + self.cvar.set(sentinel) + self.assertIs(stackless.current, stackless.main) + cid = stackless.current.context_id + self.assertIsInstance(cid, int) + t.bind(lambda: None) + self.assertEqual(stackless.current.context_id, cid) + + thr = threading.Thread(target=other_thread, name="other thread") + thr.start() + thr.join() + self.assertEqual(t.context_id, cid) + + def test_context_init_nonmain(self): + # test the set_context in tasklet.bind + # make sure a context exists + sentinel = object() + self.cvar.set(sentinel) + self.assertIs(self.cvar.get(), sentinel) + + cid = stackless.current.context_id + t = stackless.tasklet() + + # this creates + t.bind(lambda: None) + + self.assertEqual(stackless.current.context_id, cid) + self.assertEqual(t.context_id, cid) + + @staticmethod + def _test_context_setstate_alive_task(): + stackless.schedule_remove(100) + return 200 + + def test_context_setstate_alive(self): + # prepare a state of a half executed tasklet + t = stackless.tasklet(self._test_context_setstate_alive_task)() + stackless.run() + self.assertEqual(t.tempval, 100) + self.assertTrue(t.paused) + + state = t.__reduce__()[2] + for i, fw in enumerate(state[3]): + frame_factory, frame_args, frame_state = fw.__reduce__() + state[3][i] = frame_factory(*frame_args) + state[3][i].__setstate__(frame_state) + # from pprint import pprint ; pprint(state) + + # make sure a context exists + sentinel = object() + self.cvar.set(sentinel) + self.assertIs(self.cvar.get(), sentinel) + + cid = stackless.current.context_id + t = stackless.tasklet() + + # this creates + t.__setstate__(state) + + self.assertTrue(t.alive) + self.assertEqual(stackless.current.context_id, cid) + self.assertEqual(t.context_id, cid) + t.bind(None) + + def test_context_setstate_notalive(self): + # prepare a state of a new tasklet + state = stackless.tasklet().__reduce__()[2] + self.assertEqual(state[3], []) # no frames + + # make sure a context exists + sentinel = object() + self.cvar.set(sentinel) + self.assertIs(self.cvar.get(), sentinel) + + cid = stackless.current.context_id + t = stackless.tasklet() + + # this creates + t.__setstate__(state) + + self.assertFalse(t.alive) + self.assertEqual(stackless.current.context_id, cid) + self.assertEqual(t.context_run(self.cvar.get), "unset") + t.bind(None) + + def test_context_run_no_context(self): + # make sure a context exists + sentinel = object() + self.cvar.set(sentinel) + self.assertIs(self.cvar.get(), sentinel) + cid0 = stackless.current.context_id + + t = stackless.tasklet() + def get_cid(): + self.assertEqual(self.cvar.get(), "unset") + return stackless.current.context_id + + cid = t.context_run(get_cid) + + self.assertEqual(stackless.current.context_id, cid0) + self.assertIsInstance(cid, int) + self.assertNotEqual(cid0, cid) + self.assertEqual(t.context_id, cid) + + def test_context_run(self): + # make sure a context exists + sentinel = object() + self.cvar.set(sentinel) + self.assertIs(self.cvar.get(), sentinel) + cid0 = stackless.current.context_id + + t = stackless.tasklet() + ctx = contextvars.Context() + t.set_context(ctx) + + def get_cid(): + self.assertEqual(self.cvar.get(), "unset") + return stackless.current.context_id + + cid = t.context_run(get_cid) + + self.assertEqual(stackless.current.context_id, cid0) + self.assertIsInstance(cid, int) + self.assertNotEqual(cid0, cid) + self.assertEqual(id(ctx), cid) + + def test_main_tasklet_init(self): + # This test succeeds, if Stackless copies ts->context to into the main + # tasklet, when Stackless creates the main tasklet. + # This is important, if there is already a context set, when the interpreter + # gets called. Example: an interactive python prompt. + # See also: test_main_tasklet_fini + ctx_holder1 = None # use a tasklet to keep the context alive + ctx_holder2 = None + def task(): + nonlocal ctx_holder2 + ctx_holder2 = stackless.main.context_run(stackless.tasklet, id) + self.assertEqual(ctx_holder2.context_id, stackless.main.context_id) + + t = stackless.tasklet(task, ()) + def other_thread(): + nonlocal ctx_holder1 + t.bind_thread() + t.insert() + ctx_holder1 = stackless.tasklet(id) + self.assertEqual(ctx_holder1.context_id, stackless.current.context_id) + stackless._stackless._test_outside() + + tr = threading.Thread(target=other_thread, name="other thread") + tr.start() + tr.join() + self.assertIsNot(ctx_holder1, ctx_holder2) + self.assertEqual(ctx_holder1.context_id, ctx_holder2.context_id) + + def test_main_tasklet_fini(self): + # for a main tasklet of a thread initially ts->context == NULL + # This test succeeds, if Stackless copies the context of the main + # tasklet to ts->context after the main tasklet exits + # This way the last context of the main tasklet is preserved and available + # on the next invocation of the interpreter. + ctx_holder1 = None # use a tasklet to keep the context alive + ctx_holder2 = None + def task(): + nonlocal ctx_holder1 + ctx_holder1 = stackless.main.context_run(stackless.tasklet, id) + self.assertEqual(ctx_holder1.context_id, stackless.main.context_id) + + t = stackless.tasklet(task, ()) + def other_thread(): + nonlocal ctx_holder2 + t.bind_thread() + t.insert() + stackless._stackless._test_outside() + ctx_holder2 = stackless.tasklet(id) + self.assertEqual(ctx_holder2.context_id, stackless.current.context_id) + + tr = threading.Thread(target=other_thread, name="other thread") + tr.start() + tr.join() + self.assertIsNot(ctx_holder1, ctx_holder2) + self.assertEqual(ctx_holder1.context_id, ctx_holder2.context_id) + + #/////////////////////////////////////////////////////////////////////////////// if __name__ == '__main__': diff --git a/Stackless/unittests/test_pickle.py b/Stackless/unittests/test_pickle.py index d8441880498327..d167598b31c737 100644 --- a/Stackless/unittests/test_pickle.py +++ b/Stackless/unittests/test_pickle.py @@ -6,6 +6,8 @@ import copy import contextlib import threading +import contextvars +import ctypes import stackless from stackless import schedule, tasklet @@ -926,6 +928,208 @@ def test_pickling1(self): self.assertTupleEqual(self.inspect(agt), ((e, ), 1, True)) +class TestContextRunCallbackPickling(StacklessTestCase): + # Actually we test cframe.__reduce__ for cframes with exec function "context_run_callback" + # We can't pickle a contextvars.Context object. + + # declare int PyContext_Exit(PyObject *octx) + PyContext_Exit = ctypes.pythonapi.PyContext_Exit + PyContext_Exit.argtypes = [ctypes.py_object] + PyContext_Exit.restype = ctypes.c_int + + # make sure, a context exists and contains an var whose value can't be pickled + cvar = contextvars.ContextVar("TestContextPickling.cvar", default="unset") + cvar.set(object()) + + @classmethod + def task(cls): + v = cls.cvar.get() + v = stackless.schedule_remove(v) + cls.cvar.set(v) + + def _test_pickle_in_context_run(self, ctx1): + # test cframe.__reduce__ for "context_run_callback" called in Context.run + ctx2 = contextvars.Context() + t = stackless.tasklet(ctx2.run)(self.task) + + stackless.run() + self.assertEqual(t.tempval, "unset") + self.assertTrue(t.alive) + self.assertTrue(t.paused) + if is_soft(): + self.assertTrue(t.restorable) + + # now ctx2 must be in state entered + self.assertRaisesRegex(RuntimeError, "cannot enter context:.*is already entered", ctx2.run, lambda: None) + frames = t.__reduce__()[2][3] + self.assertIsInstance(frames, list) + + for f in frames: + if not isinstance(f, stackless.cframe): + continue + valid, exec_name, params, i, n = f.__reduce__()[2] + if exec_name != 'context_run_callback': + continue + self.assertEqual(valid, 1) + self.assertEqual(f.n, 0) + self.assertEqual(n, 0) + # now check, that context to switch to is in the state + # the original cframe has the currently active context + self.assertIs(f.ob1, ctx2) + self.assertTupleEqual(params, ((1, 2), ctx1, None, None)) + self.assertEqual(f.i, 0) + self.assertEqual(i, 1) + break + else: + self.assertFalse(is_soft(), "no cframe 'context_run_callback'") + + if is_soft(): + t.bind(None) + self.assertRaisesRegex(RuntimeError, "the current context of the tasklet has been entered", t.set_context, contextvars.Context()) + t.context_run(self.PyContext_Exit, ctx2) + t.set_context(contextvars.Context()) + + def test_pickle_in_context_run(self): + # test cframe.__reduce__ for "context_run_callback" called in Context.run + ctx1 = contextvars.copy_context() + ctx1.run(self._test_pickle_in_context_run, ctx1) + + def _test_pickle_in_tasklet_context_run(self, ctx1): + # test cframe.__reduce__ for "context_run_callback" called in tasklet.context_run + ctx2 = contextvars.Context() + t_run = stackless.tasklet().set_context(ctx2) + t = stackless.tasklet(t_run.context_run)(self.task) + + stackless.run() + self.assertEqual(t.tempval, "unset") + self.assertTrue(t.alive) + self.assertTrue(t.paused) + if is_soft(): + self.assertTrue(t.restorable) + + # now ctx2 must not be in state entered + ctx2.run(lambda:None) + frames = t.__reduce__()[2][3] + self.assertIsInstance(frames, list) + + for f in frames: + if not isinstance(f, stackless.cframe): + continue + valid, exec_name, params, i, n = f.__reduce__()[2] + if exec_name != 'context_run_callback': + continue + self.assertEqual(valid, 1) + self.assertEqual(f.n, 0) + self.assertEqual(n, 0) + # now check, that context to switch to is in the state + # the original cframe has the currently active context + self.assertIs(f.ob1, ctx1) + self.assertTupleEqual(params, ((1, 2), ctx1, None, None)) + self.assertEqual(f.i, 1) + self.assertEqual(i, 1) + break + else: + self.assertFalse(is_soft(), "no cframe 'context_run_callback'") + + if is_soft(): + t.bind(None) + t.set_context(contextvars.Context()) + + def test_pickle_in_tasklet_context_run(self): + # test cframe.__reduce__ for "context_run_callback" called in tasklet.context_run + ctx1 = contextvars.copy_context() + ctx1.run(self._test_pickle_in_tasklet_context_run, ctx1) + + def test_tasklet_get_unpicklable_state(self): + cid = stackless.current.context_id + ctx = stackless._tasklet_get_unpicklable_state(stackless.current)['context'] + self.assertEqual(stackless.current.context_id, cid) + self.assertIsInstance(ctx, contextvars.Context) + self.assertEqual(id(ctx), cid) + + +class TestContextPickling(StacklessPickleTestCase): + # We can't pickle a contextvars.Context object. + + # declare int PyContext_Exit(PyObject *octx) + PyContext_Exit = ctypes.pythonapi.PyContext_Exit + PyContext_Exit.argtypes = [ctypes.py_object] + PyContext_Exit.restype = ctypes.c_int + + # make sure, a context exists and contains an var whose value can't be pickled + cvar = contextvars.ContextVar("TestContextPickling.cvar", default="unset") + sentinel = object() + cvar.set(sentinel) + + def setUp(self): + super().setUp() + stackless.pickle_flags(stackless.PICKLEFLAGS_PICKLE_CONTEXT, stackless.PICKLEFLAGS_PICKLE_CONTEXT) + + def tearDown(self): + super().tearDown() + stackless.pickle_flags(0, stackless.PICKLEFLAGS_PICKLE_CONTEXT) + + @classmethod + def task(cls, arg): + v = cls.cvar.get() + cls.cvar.set(arg) + v = stackless.schedule_remove(v) + cls.cvar.set(v) + + def test_pickle_tasklet_with_context(self): + ctx = contextvars.copy_context() + t = stackless.tasklet(self.task)("changed").set_context(ctx) + stackless.run() + self.assertTrue(t.alive) + self.assertIs(t.tempval, self.sentinel) + self.assertEqual(ctx[self.cvar], "changed") + + external_map = {"context ctx": ctx} + + p = self.dumps(t, external_map = external_map) + # import pickletools; pickletools.dis(pickletools.optimize(p)) + t.kill() + + t = self.loads(p, external_map = external_map) + self.assertEqual(t.context_id, id(ctx)) + self.assertTrue(t.alive) + t.insert() + t.tempval = "after unpickling" + if is_soft(): + stackless.run() + self.assertFalse(t.alive) + self.assertEqual(ctx[self.cvar], "after unpickling") + + def test_pickle_tasklet_in_tasklet_context_run(self): + ctx1 = contextvars.copy_context() + ctx2 = contextvars.Context() + t = stackless.tasklet(stackless.tasklet().set_context(ctx2).context_run)(self.task, "changed").set_context(ctx1) + self.assertEqual(t.context_id, id(ctx1)) + stackless.run() + self.assertTrue(t.alive) + self.assertIs(ctx1[self.cvar], self.sentinel) + self.assertEqual(t.tempval, "unset") + self.assertEqual(ctx2[self.cvar], "changed") + + external_map = {"context ctx1": ctx1, "context ctx2": ctx2,} + + p = self.dumps(t, external_map = external_map) + # import pickletools; pickletools.dis(pickletools.optimize(p)) + t.kill() + + t = self.loads(p, external_map = external_map) + self.assertEqual(t.context_id, id(ctx2)) + self.assertTrue(t.alive) + t.insert() + t.tempval = "after unpickling" + if is_soft(): + stackless.run() + self.assertFalse(t.alive) + self.assertEqual(t.context_id, id(ctx1)) + self.assertEqual(ctx2[self.cvar], "after unpickling") + self.assertIs(ctx1[self.cvar], self.sentinel) + + class TestCopy(StacklessTestCase): ITERATOR_TYPE = type(iter("abc"))