1 
2 /* interpreters module */
3 /* low-level access to interpreter primitives */
4 #ifndef Py_BUILD_CORE_BUILTIN
5 #  define Py_BUILD_CORE_MODULE 1
6 #endif
7 
8 #include "Python.h"
9 #include "pycore_frame.h"
10 #include "pycore_pystate.h"       // _PyThreadState_GET()
11 #include "pycore_interpreteridobject.h"
12 
13 
14 static char *
_copy_raw_string(PyObject * strobj)15 _copy_raw_string(PyObject *strobj)
16 {
17     const char *str = PyUnicode_AsUTF8(strobj);
18     if (str == NULL) {
19         return NULL;
20     }
21     char *copied = PyMem_Malloc(strlen(str)+1);
22     if (copied == NULL) {
23         PyErr_NoMemory();
24         return NULL;
25     }
26     strcpy(copied, str);
27     return copied;
28 }
29 
30 static PyInterpreterState *
_get_current(void)31 _get_current(void)
32 {
33     // PyInterpreterState_Get() aborts if lookup fails, so don't need
34     // to check the result for NULL.
35     return PyInterpreterState_Get();
36 }
37 
38 
39 /* data-sharing-specific code ***********************************************/
40 
41 struct _sharednsitem {
42     char *name;
43     _PyCrossInterpreterData data;
44 };
45 
46 static void _sharednsitem_clear(struct _sharednsitem *);  // forward
47 
48 static int
_sharednsitem_init(struct _sharednsitem * item,PyObject * key,PyObject * value)49 _sharednsitem_init(struct _sharednsitem *item, PyObject *key, PyObject *value)
50 {
51     item->name = _copy_raw_string(key);
52     if (item->name == NULL) {
53         return -1;
54     }
55     if (_PyObject_GetCrossInterpreterData(value, &item->data) != 0) {
56         _sharednsitem_clear(item);
57         return -1;
58     }
59     return 0;
60 }
61 
62 static void
_sharednsitem_clear(struct _sharednsitem * item)63 _sharednsitem_clear(struct _sharednsitem *item)
64 {
65     if (item->name != NULL) {
66         PyMem_Free(item->name);
67         item->name = NULL;
68     }
69     _PyCrossInterpreterData_Release(&item->data);
70 }
71 
72 static int
_sharednsitem_apply(struct _sharednsitem * item,PyObject * ns)73 _sharednsitem_apply(struct _sharednsitem *item, PyObject *ns)
74 {
75     PyObject *name = PyUnicode_FromString(item->name);
76     if (name == NULL) {
77         return -1;
78     }
79     PyObject *value = _PyCrossInterpreterData_NewObject(&item->data);
80     if (value == NULL) {
81         Py_DECREF(name);
82         return -1;
83     }
84     int res = PyDict_SetItem(ns, name, value);
85     Py_DECREF(name);
86     Py_DECREF(value);
87     return res;
88 }
89 
90 typedef struct _sharedns {
91     Py_ssize_t len;
92     struct _sharednsitem* items;
93 } _sharedns;
94 
95 static _sharedns *
_sharedns_new(Py_ssize_t len)96 _sharedns_new(Py_ssize_t len)
97 {
98     _sharedns *shared = PyMem_NEW(_sharedns, 1);
99     if (shared == NULL) {
100         PyErr_NoMemory();
101         return NULL;
102     }
103     shared->len = len;
104     shared->items = PyMem_NEW(struct _sharednsitem, len);
105     if (shared->items == NULL) {
106         PyErr_NoMemory();
107         PyMem_Free(shared);
108         return NULL;
109     }
110     return shared;
111 }
112 
113 static void
_sharedns_free(_sharedns * shared)114 _sharedns_free(_sharedns *shared)
115 {
116     for (Py_ssize_t i=0; i < shared->len; i++) {
117         _sharednsitem_clear(&shared->items[i]);
118     }
119     PyMem_Free(shared->items);
120     PyMem_Free(shared);
121 }
122 
123 static _sharedns *
_get_shared_ns(PyObject * shareable)124 _get_shared_ns(PyObject *shareable)
125 {
126     if (shareable == NULL || shareable == Py_None) {
127         return NULL;
128     }
129     Py_ssize_t len = PyDict_Size(shareable);
130     if (len == 0) {
131         return NULL;
132     }
133 
134     _sharedns *shared = _sharedns_new(len);
135     if (shared == NULL) {
136         return NULL;
137     }
138     Py_ssize_t pos = 0;
139     for (Py_ssize_t i=0; i < len; i++) {
140         PyObject *key, *value;
141         if (PyDict_Next(shareable, &pos, &key, &value) == 0) {
142             break;
143         }
144         if (_sharednsitem_init(&shared->items[i], key, value) != 0) {
145             break;
146         }
147     }
148     if (PyErr_Occurred()) {
149         _sharedns_free(shared);
150         return NULL;
151     }
152     return shared;
153 }
154 
155 static int
_sharedns_apply(_sharedns * shared,PyObject * ns)156 _sharedns_apply(_sharedns *shared, PyObject *ns)
157 {
158     for (Py_ssize_t i=0; i < shared->len; i++) {
159         if (_sharednsitem_apply(&shared->items[i], ns) != 0) {
160             return -1;
161         }
162     }
163     return 0;
164 }
165 
166 // Ultimately we'd like to preserve enough information about the
167 // exception and traceback that we could re-constitute (or at least
168 // simulate, a la traceback.TracebackException), and even chain, a copy
169 // of the exception in the calling interpreter.
170 
171 typedef struct _sharedexception {
172     char *name;
173     char *msg;
174 } _sharedexception;
175 
176 static _sharedexception *
_sharedexception_new(void)177 _sharedexception_new(void)
178 {
179     _sharedexception *err = PyMem_NEW(_sharedexception, 1);
180     if (err == NULL) {
181         PyErr_NoMemory();
182         return NULL;
183     }
184     err->name = NULL;
185     err->msg = NULL;
186     return err;
187 }
188 
189 static void
_sharedexception_clear(_sharedexception * exc)190 _sharedexception_clear(_sharedexception *exc)
191 {
192     if (exc->name != NULL) {
193         PyMem_Free(exc->name);
194     }
195     if (exc->msg != NULL) {
196         PyMem_Free(exc->msg);
197     }
198 }
199 
200 static void
_sharedexception_free(_sharedexception * exc)201 _sharedexception_free(_sharedexception *exc)
202 {
203     _sharedexception_clear(exc);
204     PyMem_Free(exc);
205 }
206 
207 static _sharedexception *
_sharedexception_bind(PyObject * exctype,PyObject * exc,PyObject * tb)208 _sharedexception_bind(PyObject *exctype, PyObject *exc, PyObject *tb)
209 {
210     assert(exctype != NULL);
211     char *failure = NULL;
212 
213     _sharedexception *err = _sharedexception_new();
214     if (err == NULL) {
215         goto finally;
216     }
217 
218     PyObject *name = PyUnicode_FromFormat("%S", exctype);
219     if (name == NULL) {
220         failure = "unable to format exception type name";
221         goto finally;
222     }
223     err->name = _copy_raw_string(name);
224     Py_DECREF(name);
225     if (err->name == NULL) {
226         if (PyErr_ExceptionMatches(PyExc_MemoryError)) {
227             failure = "out of memory copying exception type name";
228         } else {
229             failure = "unable to encode and copy exception type name";
230         }
231         goto finally;
232     }
233 
234     if (exc != NULL) {
235         PyObject *msg = PyUnicode_FromFormat("%S", exc);
236         if (msg == NULL) {
237             failure = "unable to format exception message";
238             goto finally;
239         }
240         err->msg = _copy_raw_string(msg);
241         Py_DECREF(msg);
242         if (err->msg == NULL) {
243             if (PyErr_ExceptionMatches(PyExc_MemoryError)) {
244                 failure = "out of memory copying exception message";
245             } else {
246                 failure = "unable to encode and copy exception message";
247             }
248             goto finally;
249         }
250     }
251 
252 finally:
253     if (failure != NULL) {
254         PyErr_Clear();
255         if (err->name != NULL) {
256             PyMem_Free(err->name);
257             err->name = NULL;
258         }
259         err->msg = failure;
260     }
261     return err;
262 }
263 
264 static void
_sharedexception_apply(_sharedexception * exc,PyObject * wrapperclass)265 _sharedexception_apply(_sharedexception *exc, PyObject *wrapperclass)
266 {
267     if (exc->name != NULL) {
268         if (exc->msg != NULL) {
269             PyErr_Format(wrapperclass, "%s: %s",  exc->name, exc->msg);
270         }
271         else {
272             PyErr_SetString(wrapperclass, exc->name);
273         }
274     }
275     else if (exc->msg != NULL) {
276         PyErr_SetString(wrapperclass, exc->msg);
277     }
278     else {
279         PyErr_SetNone(wrapperclass);
280     }
281 }
282 
283 
284 /* channel-specific code ****************************************************/
285 
286 #define CHANNEL_SEND 1
287 #define CHANNEL_BOTH 0
288 #define CHANNEL_RECV -1
289 
290 static PyObject *ChannelError;
291 static PyObject *ChannelNotFoundError;
292 static PyObject *ChannelClosedError;
293 static PyObject *ChannelEmptyError;
294 static PyObject *ChannelNotEmptyError;
295 
296 static int
channel_exceptions_init(PyObject * ns)297 channel_exceptions_init(PyObject *ns)
298 {
299     // XXX Move the exceptions into per-module memory?
300 
301     // A channel-related operation failed.
302     ChannelError = PyErr_NewException("_xxsubinterpreters.ChannelError",
303                                       PyExc_RuntimeError, NULL);
304     if (ChannelError == NULL) {
305         return -1;
306     }
307     if (PyDict_SetItemString(ns, "ChannelError", ChannelError) != 0) {
308         return -1;
309     }
310 
311     // An operation tried to use a channel that doesn't exist.
312     ChannelNotFoundError = PyErr_NewException(
313             "_xxsubinterpreters.ChannelNotFoundError", ChannelError, NULL);
314     if (ChannelNotFoundError == NULL) {
315         return -1;
316     }
317     if (PyDict_SetItemString(ns, "ChannelNotFoundError", ChannelNotFoundError) != 0) {
318         return -1;
319     }
320 
321     // An operation tried to use a closed channel.
322     ChannelClosedError = PyErr_NewException(
323             "_xxsubinterpreters.ChannelClosedError", ChannelError, NULL);
324     if (ChannelClosedError == NULL) {
325         return -1;
326     }
327     if (PyDict_SetItemString(ns, "ChannelClosedError", ChannelClosedError) != 0) {
328         return -1;
329     }
330 
331     // An operation tried to pop from an empty channel.
332     ChannelEmptyError = PyErr_NewException(
333             "_xxsubinterpreters.ChannelEmptyError", ChannelError, NULL);
334     if (ChannelEmptyError == NULL) {
335         return -1;
336     }
337     if (PyDict_SetItemString(ns, "ChannelEmptyError", ChannelEmptyError) != 0) {
338         return -1;
339     }
340 
341     // An operation tried to close a non-empty channel.
342     ChannelNotEmptyError = PyErr_NewException(
343             "_xxsubinterpreters.ChannelNotEmptyError", ChannelError, NULL);
344     if (ChannelNotEmptyError == NULL) {
345         return -1;
346     }
347     if (PyDict_SetItemString(ns, "ChannelNotEmptyError", ChannelNotEmptyError) != 0) {
348         return -1;
349     }
350 
351     return 0;
352 }
353 
354 /* the channel queue */
355 
356 struct _channelitem;
357 
358 typedef struct _channelitem {
359     _PyCrossInterpreterData *data;
360     struct _channelitem *next;
361 } _channelitem;
362 
363 static _channelitem *
_channelitem_new(void)364 _channelitem_new(void)
365 {
366     _channelitem *item = PyMem_NEW(_channelitem, 1);
367     if (item == NULL) {
368         PyErr_NoMemory();
369         return NULL;
370     }
371     item->data = NULL;
372     item->next = NULL;
373     return item;
374 }
375 
376 static void
_channelitem_clear(_channelitem * item)377 _channelitem_clear(_channelitem *item)
378 {
379     if (item->data != NULL) {
380         _PyCrossInterpreterData_Release(item->data);
381         PyMem_Free(item->data);
382         item->data = NULL;
383     }
384     item->next = NULL;
385 }
386 
387 static void
_channelitem_free(_channelitem * item)388 _channelitem_free(_channelitem *item)
389 {
390     _channelitem_clear(item);
391     PyMem_Free(item);
392 }
393 
394 static void
_channelitem_free_all(_channelitem * item)395 _channelitem_free_all(_channelitem *item)
396 {
397     while (item != NULL) {
398         _channelitem *last = item;
399         item = item->next;
400         _channelitem_free(last);
401     }
402 }
403 
404 static _PyCrossInterpreterData *
_channelitem_popped(_channelitem * item)405 _channelitem_popped(_channelitem *item)
406 {
407     _PyCrossInterpreterData *data = item->data;
408     item->data = NULL;
409     _channelitem_free(item);
410     return data;
411 }
412 
413 typedef struct _channelqueue {
414     int64_t count;
415     _channelitem *first;
416     _channelitem *last;
417 } _channelqueue;
418 
419 static _channelqueue *
_channelqueue_new(void)420 _channelqueue_new(void)
421 {
422     _channelqueue *queue = PyMem_NEW(_channelqueue, 1);
423     if (queue == NULL) {
424         PyErr_NoMemory();
425         return NULL;
426     }
427     queue->count = 0;
428     queue->first = NULL;
429     queue->last = NULL;
430     return queue;
431 }
432 
433 static void
_channelqueue_clear(_channelqueue * queue)434 _channelqueue_clear(_channelqueue *queue)
435 {
436     _channelitem_free_all(queue->first);
437     queue->count = 0;
438     queue->first = NULL;
439     queue->last = NULL;
440 }
441 
442 static void
_channelqueue_free(_channelqueue * queue)443 _channelqueue_free(_channelqueue *queue)
444 {
445     _channelqueue_clear(queue);
446     PyMem_Free(queue);
447 }
448 
449 static int
_channelqueue_put(_channelqueue * queue,_PyCrossInterpreterData * data)450 _channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data)
451 {
452     _channelitem *item = _channelitem_new();
453     if (item == NULL) {
454         return -1;
455     }
456     item->data = data;
457 
458     queue->count += 1;
459     if (queue->first == NULL) {
460         queue->first = item;
461     }
462     else {
463         queue->last->next = item;
464     }
465     queue->last = item;
466     return 0;
467 }
468 
469 static _PyCrossInterpreterData *
_channelqueue_get(_channelqueue * queue)470 _channelqueue_get(_channelqueue *queue)
471 {
472     _channelitem *item = queue->first;
473     if (item == NULL) {
474         return NULL;
475     }
476     queue->first = item->next;
477     if (queue->last == item) {
478         queue->last = NULL;
479     }
480     queue->count -= 1;
481 
482     return _channelitem_popped(item);
483 }
484 
485 /* channel-interpreter associations */
486 
487 struct _channelend;
488 
489 typedef struct _channelend {
490     struct _channelend *next;
491     int64_t interp;
492     int open;
493 } _channelend;
494 
495 static _channelend *
_channelend_new(int64_t interp)496 _channelend_new(int64_t interp)
497 {
498     _channelend *end = PyMem_NEW(_channelend, 1);
499     if (end == NULL) {
500         PyErr_NoMemory();
501         return NULL;
502     }
503     end->next = NULL;
504     end->interp = interp;
505     end->open = 1;
506     return end;
507 }
508 
509 static void
_channelend_free(_channelend * end)510 _channelend_free(_channelend *end)
511 {
512     PyMem_Free(end);
513 }
514 
515 static void
_channelend_free_all(_channelend * end)516 _channelend_free_all(_channelend *end)
517 {
518     while (end != NULL) {
519         _channelend *last = end;
520         end = end->next;
521         _channelend_free(last);
522     }
523 }
524 
525 static _channelend *
_channelend_find(_channelend * first,int64_t interp,_channelend ** pprev)526 _channelend_find(_channelend *first, int64_t interp, _channelend **pprev)
527 {
528     _channelend *prev = NULL;
529     _channelend *end = first;
530     while (end != NULL) {
531         if (end->interp == interp) {
532             break;
533         }
534         prev = end;
535         end = end->next;
536     }
537     if (pprev != NULL) {
538         *pprev = prev;
539     }
540     return end;
541 }
542 
543 typedef struct _channelassociations {
544     // Note that the list entries are never removed for interpreter
545     // for which the channel is closed.  This should not be a problem in
546     // practice.  Also, a channel isn't automatically closed when an
547     // interpreter is destroyed.
548     int64_t numsendopen;
549     int64_t numrecvopen;
550     _channelend *send;
551     _channelend *recv;
552 } _channelends;
553 
554 static _channelends *
_channelends_new(void)555 _channelends_new(void)
556 {
557     _channelends *ends = PyMem_NEW(_channelends, 1);
558     if (ends== NULL) {
559         return NULL;
560     }
561     ends->numsendopen = 0;
562     ends->numrecvopen = 0;
563     ends->send = NULL;
564     ends->recv = NULL;
565     return ends;
566 }
567 
568 static void
_channelends_clear(_channelends * ends)569 _channelends_clear(_channelends *ends)
570 {
571     _channelend_free_all(ends->send);
572     ends->send = NULL;
573     ends->numsendopen = 0;
574 
575     _channelend_free_all(ends->recv);
576     ends->recv = NULL;
577     ends->numrecvopen = 0;
578 }
579 
580 static void
_channelends_free(_channelends * ends)581 _channelends_free(_channelends *ends)
582 {
583     _channelends_clear(ends);
584     PyMem_Free(ends);
585 }
586 
587 static _channelend *
_channelends_add(_channelends * ends,_channelend * prev,int64_t interp,int send)588 _channelends_add(_channelends *ends, _channelend *prev, int64_t interp,
589                  int send)
590 {
591     _channelend *end = _channelend_new(interp);
592     if (end == NULL) {
593         return NULL;
594     }
595 
596     if (prev == NULL) {
597         if (send) {
598             ends->send = end;
599         }
600         else {
601             ends->recv = end;
602         }
603     }
604     else {
605         prev->next = end;
606     }
607     if (send) {
608         ends->numsendopen += 1;
609     }
610     else {
611         ends->numrecvopen += 1;
612     }
613     return end;
614 }
615 
616 static int
_channelends_associate(_channelends * ends,int64_t interp,int send)617 _channelends_associate(_channelends *ends, int64_t interp, int send)
618 {
619     _channelend *prev;
620     _channelend *end = _channelend_find(send ? ends->send : ends->recv,
621                                         interp, &prev);
622     if (end != NULL) {
623         if (!end->open) {
624             PyErr_SetString(ChannelClosedError, "channel already closed");
625             return -1;
626         }
627         // already associated
628         return 0;
629     }
630     if (_channelends_add(ends, prev, interp, send) == NULL) {
631         return -1;
632     }
633     return 0;
634 }
635 
636 static int
_channelends_is_open(_channelends * ends)637 _channelends_is_open(_channelends *ends)
638 {
639     if (ends->numsendopen != 0 || ends->numrecvopen != 0) {
640         return 1;
641     }
642     if (ends->send == NULL && ends->recv == NULL) {
643         return 1;
644     }
645     return 0;
646 }
647 
648 static void
_channelends_close_end(_channelends * ends,_channelend * end,int send)649 _channelends_close_end(_channelends *ends, _channelend *end, int send)
650 {
651     end->open = 0;
652     if (send) {
653         ends->numsendopen -= 1;
654     }
655     else {
656         ends->numrecvopen -= 1;
657     }
658 }
659 
660 static int
_channelends_close_interpreter(_channelends * ends,int64_t interp,int which)661 _channelends_close_interpreter(_channelends *ends, int64_t interp, int which)
662 {
663     _channelend *prev;
664     _channelend *end;
665     if (which >= 0) {  // send/both
666         end = _channelend_find(ends->send, interp, &prev);
667         if (end == NULL) {
668             // never associated so add it
669             end = _channelends_add(ends, prev, interp, 1);
670             if (end == NULL) {
671                 return -1;
672             }
673         }
674         _channelends_close_end(ends, end, 1);
675     }
676     if (which <= 0) {  // recv/both
677         end = _channelend_find(ends->recv, interp, &prev);
678         if (end == NULL) {
679             // never associated so add it
680             end = _channelends_add(ends, prev, interp, 0);
681             if (end == NULL) {
682                 return -1;
683             }
684         }
685         _channelends_close_end(ends, end, 0);
686     }
687     return 0;
688 }
689 
690 static void
_channelends_close_all(_channelends * ends,int which,int force)691 _channelends_close_all(_channelends *ends, int which, int force)
692 {
693     // XXX Handle the ends.
694     // XXX Handle force is True.
695 
696     // Ensure all the "send"-associated interpreters are closed.
697     _channelend *end;
698     for (end = ends->send; end != NULL; end = end->next) {
699         _channelends_close_end(ends, end, 1);
700     }
701 
702     // Ensure all the "recv"-associated interpreters are closed.
703     for (end = ends->recv; end != NULL; end = end->next) {
704         _channelends_close_end(ends, end, 0);
705     }
706 }
707 
708 /* channels */
709 
710 struct _channel;
711 struct _channel_closing;
712 static void _channel_clear_closing(struct _channel *);
713 static void _channel_finish_closing(struct _channel *);
714 
715 typedef struct _channel {
716     PyThread_type_lock mutex;
717     _channelqueue *queue;
718     _channelends *ends;
719     int open;
720     struct _channel_closing *closing;
721 } _PyChannelState;
722 
723 static _PyChannelState *
_channel_new(void)724 _channel_new(void)
725 {
726     _PyChannelState *chan = PyMem_NEW(_PyChannelState, 1);
727     if (chan == NULL) {
728         return NULL;
729     }
730     chan->mutex = PyThread_allocate_lock();
731     if (chan->mutex == NULL) {
732         PyMem_Free(chan);
733         PyErr_SetString(ChannelError,
734                         "can't initialize mutex for new channel");
735         return NULL;
736     }
737     chan->queue = _channelqueue_new();
738     if (chan->queue == NULL) {
739         PyMem_Free(chan);
740         return NULL;
741     }
742     chan->ends = _channelends_new();
743     if (chan->ends == NULL) {
744         _channelqueue_free(chan->queue);
745         PyMem_Free(chan);
746         return NULL;
747     }
748     chan->open = 1;
749     chan->closing = NULL;
750     return chan;
751 }
752 
753 static void
_channel_free(_PyChannelState * chan)754 _channel_free(_PyChannelState *chan)
755 {
756     _channel_clear_closing(chan);
757     PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
758     _channelqueue_free(chan->queue);
759     _channelends_free(chan->ends);
760     PyThread_release_lock(chan->mutex);
761 
762     PyThread_free_lock(chan->mutex);
763     PyMem_Free(chan);
764 }
765 
766 static int
_channel_add(_PyChannelState * chan,int64_t interp,_PyCrossInterpreterData * data)767 _channel_add(_PyChannelState *chan, int64_t interp,
768              _PyCrossInterpreterData *data)
769 {
770     int res = -1;
771     PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
772 
773     if (!chan->open) {
774         PyErr_SetString(ChannelClosedError, "channel closed");
775         goto done;
776     }
777     if (_channelends_associate(chan->ends, interp, 1) != 0) {
778         goto done;
779     }
780 
781     if (_channelqueue_put(chan->queue, data) != 0) {
782         goto done;
783     }
784 
785     res = 0;
786 done:
787     PyThread_release_lock(chan->mutex);
788     return res;
789 }
790 
791 static _PyCrossInterpreterData *
_channel_next(_PyChannelState * chan,int64_t interp)792 _channel_next(_PyChannelState *chan, int64_t interp)
793 {
794     _PyCrossInterpreterData *data = NULL;
795     PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
796 
797     if (!chan->open) {
798         PyErr_SetString(ChannelClosedError, "channel closed");
799         goto done;
800     }
801     if (_channelends_associate(chan->ends, interp, 0) != 0) {
802         goto done;
803     }
804 
805     data = _channelqueue_get(chan->queue);
806     if (data == NULL && !PyErr_Occurred() && chan->closing != NULL) {
807         chan->open = 0;
808     }
809 
810 done:
811     PyThread_release_lock(chan->mutex);
812     if (chan->queue->count == 0) {
813         _channel_finish_closing(chan);
814     }
815     return data;
816 }
817 
818 static int
_channel_close_interpreter(_PyChannelState * chan,int64_t interp,int end)819 _channel_close_interpreter(_PyChannelState *chan, int64_t interp, int end)
820 {
821     PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
822 
823     int res = -1;
824     if (!chan->open) {
825         PyErr_SetString(ChannelClosedError, "channel already closed");
826         goto done;
827     }
828 
829     if (_channelends_close_interpreter(chan->ends, interp, end) != 0) {
830         goto done;
831     }
832     chan->open = _channelends_is_open(chan->ends);
833 
834     res = 0;
835 done:
836     PyThread_release_lock(chan->mutex);
837     return res;
838 }
839 
840 static int
_channel_close_all(_PyChannelState * chan,int end,int force)841 _channel_close_all(_PyChannelState *chan, int end, int force)
842 {
843     int res = -1;
844     PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
845 
846     if (!chan->open) {
847         PyErr_SetString(ChannelClosedError, "channel already closed");
848         goto done;
849     }
850 
851     if (!force && chan->queue->count > 0) {
852         PyErr_SetString(ChannelNotEmptyError,
853                         "may not be closed if not empty (try force=True)");
854         goto done;
855     }
856 
857     chan->open = 0;
858 
859     // We *could* also just leave these in place, since we've marked
860     // the channel as closed already.
861     _channelends_close_all(chan->ends, end, force);
862 
863     res = 0;
864 done:
865     PyThread_release_lock(chan->mutex);
866     return res;
867 }
868 
869 /* the set of channels */
870 
871 struct _channelref;
872 
873 typedef struct _channelref {
874     int64_t id;
875     _PyChannelState *chan;
876     struct _channelref *next;
877     Py_ssize_t objcount;
878 } _channelref;
879 
880 static _channelref *
_channelref_new(int64_t id,_PyChannelState * chan)881 _channelref_new(int64_t id, _PyChannelState *chan)
882 {
883     _channelref *ref = PyMem_NEW(_channelref, 1);
884     if (ref == NULL) {
885         return NULL;
886     }
887     ref->id = id;
888     ref->chan = chan;
889     ref->next = NULL;
890     ref->objcount = 0;
891     return ref;
892 }
893 
894 //static void
895 //_channelref_clear(_channelref *ref)
896 //{
897 //    ref->id = -1;
898 //    ref->chan = NULL;
899 //    ref->next = NULL;
900 //    ref->objcount = 0;
901 //}
902 
903 static void
_channelref_free(_channelref * ref)904 _channelref_free(_channelref *ref)
905 {
906     if (ref->chan != NULL) {
907         _channel_clear_closing(ref->chan);
908     }
909     //_channelref_clear(ref);
910     PyMem_Free(ref);
911 }
912 
913 static _channelref *
_channelref_find(_channelref * first,int64_t id,_channelref ** pprev)914 _channelref_find(_channelref *first, int64_t id, _channelref **pprev)
915 {
916     _channelref *prev = NULL;
917     _channelref *ref = first;
918     while (ref != NULL) {
919         if (ref->id == id) {
920             break;
921         }
922         prev = ref;
923         ref = ref->next;
924     }
925     if (pprev != NULL) {
926         *pprev = prev;
927     }
928     return ref;
929 }
930 
931 typedef struct _channels {
932     PyThread_type_lock mutex;
933     _channelref *head;
934     int64_t numopen;
935     int64_t next_id;
936 } _channels;
937 
938 static int
_channels_init(_channels * channels)939 _channels_init(_channels *channels)
940 {
941     if (channels->mutex == NULL) {
942         channels->mutex = PyThread_allocate_lock();
943         if (channels->mutex == NULL) {
944             PyErr_SetString(ChannelError,
945                             "can't initialize mutex for channel management");
946             return -1;
947         }
948     }
949     channels->head = NULL;
950     channels->numopen = 0;
951     channels->next_id = 0;
952     return 0;
953 }
954 
955 static int64_t
_channels_next_id(_channels * channels)956 _channels_next_id(_channels *channels)  // needs lock
957 {
958     int64_t id = channels->next_id;
959     if (id < 0) {
960         /* overflow */
961         PyErr_SetString(ChannelError,
962                         "failed to get a channel ID");
963         return -1;
964     }
965     channels->next_id += 1;
966     return id;
967 }
968 
969 static _PyChannelState *
_channels_lookup(_channels * channels,int64_t id,PyThread_type_lock * pmutex)970 _channels_lookup(_channels *channels, int64_t id, PyThread_type_lock *pmutex)
971 {
972     _PyChannelState *chan = NULL;
973     PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
974     if (pmutex != NULL) {
975         *pmutex = NULL;
976     }
977 
978     _channelref *ref = _channelref_find(channels->head, id, NULL);
979     if (ref == NULL) {
980         PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", id);
981         goto done;
982     }
983     if (ref->chan == NULL || !ref->chan->open) {
984         PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", id);
985         goto done;
986     }
987 
988     if (pmutex != NULL) {
989         // The mutex will be closed by the caller.
990         *pmutex = channels->mutex;
991     }
992 
993     chan = ref->chan;
994 done:
995     if (pmutex == NULL || *pmutex == NULL) {
996         PyThread_release_lock(channels->mutex);
997     }
998     return chan;
999 }
1000 
1001 static int64_t
_channels_add(_channels * channels,_PyChannelState * chan)1002 _channels_add(_channels *channels, _PyChannelState *chan)
1003 {
1004     int64_t cid = -1;
1005     PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1006 
1007     // Create a new ref.
1008     int64_t id = _channels_next_id(channels);
1009     if (id < 0) {
1010         goto done;
1011     }
1012     _channelref *ref = _channelref_new(id, chan);
1013     if (ref == NULL) {
1014         goto done;
1015     }
1016 
1017     // Add it to the list.
1018     // We assume that the channel is a new one (not already in the list).
1019     ref->next = channels->head;
1020     channels->head = ref;
1021     channels->numopen += 1;
1022 
1023     cid = id;
1024 done:
1025     PyThread_release_lock(channels->mutex);
1026     return cid;
1027 }
1028 
1029 /* forward */
1030 static int _channel_set_closing(struct _channelref *, PyThread_type_lock);
1031 
1032 static int
_channels_close(_channels * channels,int64_t cid,_PyChannelState ** pchan,int end,int force)1033 _channels_close(_channels *channels, int64_t cid, _PyChannelState **pchan,
1034                 int end, int force)
1035 {
1036     int res = -1;
1037     PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1038     if (pchan != NULL) {
1039         *pchan = NULL;
1040     }
1041 
1042     _channelref *ref = _channelref_find(channels->head, cid, NULL);
1043     if (ref == NULL) {
1044         PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", cid);
1045         goto done;
1046     }
1047 
1048     if (ref->chan == NULL) {
1049         PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", cid);
1050         goto done;
1051     }
1052     else if (!force && end == CHANNEL_SEND && ref->chan->closing != NULL) {
1053         PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", cid);
1054         goto done;
1055     }
1056     else {
1057         if (_channel_close_all(ref->chan, end, force) != 0) {
1058             if (end == CHANNEL_SEND &&
1059                     PyErr_ExceptionMatches(ChannelNotEmptyError)) {
1060                 if (ref->chan->closing != NULL) {
1061                     PyErr_Format(ChannelClosedError,
1062                                  "channel %" PRId64 " closed", cid);
1063                     goto done;
1064                 }
1065                 // Mark the channel as closing and return.  The channel
1066                 // will be cleaned up in _channel_next().
1067                 PyErr_Clear();
1068                 if (_channel_set_closing(ref, channels->mutex) != 0) {
1069                     goto done;
1070                 }
1071                 if (pchan != NULL) {
1072                     *pchan = ref->chan;
1073                 }
1074                 res = 0;
1075             }
1076             goto done;
1077         }
1078         if (pchan != NULL) {
1079             *pchan = ref->chan;
1080         }
1081         else  {
1082             _channel_free(ref->chan);
1083         }
1084         ref->chan = NULL;
1085     }
1086 
1087     res = 0;
1088 done:
1089     PyThread_release_lock(channels->mutex);
1090     return res;
1091 }
1092 
1093 static void
_channels_remove_ref(_channels * channels,_channelref * ref,_channelref * prev,_PyChannelState ** pchan)1094 _channels_remove_ref(_channels *channels, _channelref *ref, _channelref *prev,
1095                      _PyChannelState **pchan)
1096 {
1097     if (ref == channels->head) {
1098         channels->head = ref->next;
1099     }
1100     else {
1101         prev->next = ref->next;
1102     }
1103     channels->numopen -= 1;
1104 
1105     if (pchan != NULL) {
1106         *pchan = ref->chan;
1107     }
1108     _channelref_free(ref);
1109 }
1110 
1111 static int
_channels_remove(_channels * channels,int64_t id,_PyChannelState ** pchan)1112 _channels_remove(_channels *channels, int64_t id, _PyChannelState **pchan)
1113 {
1114     int res = -1;
1115     PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1116 
1117     if (pchan != NULL) {
1118         *pchan = NULL;
1119     }
1120 
1121     _channelref *prev = NULL;
1122     _channelref *ref = _channelref_find(channels->head, id, &prev);
1123     if (ref == NULL) {
1124         PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", id);
1125         goto done;
1126     }
1127 
1128     _channels_remove_ref(channels, ref, prev, pchan);
1129 
1130     res = 0;
1131 done:
1132     PyThread_release_lock(channels->mutex);
1133     return res;
1134 }
1135 
1136 static int
_channels_add_id_object(_channels * channels,int64_t id)1137 _channels_add_id_object(_channels *channels, int64_t id)
1138 {
1139     int res = -1;
1140     PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1141 
1142     _channelref *ref = _channelref_find(channels->head, id, NULL);
1143     if (ref == NULL) {
1144         PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", id);
1145         goto done;
1146     }
1147     ref->objcount += 1;
1148 
1149     res = 0;
1150 done:
1151     PyThread_release_lock(channels->mutex);
1152     return res;
1153 }
1154 
1155 static void
_channels_drop_id_object(_channels * channels,int64_t id)1156 _channels_drop_id_object(_channels *channels, int64_t id)
1157 {
1158     PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1159 
1160     _channelref *prev = NULL;
1161     _channelref *ref = _channelref_find(channels->head, id, &prev);
1162     if (ref == NULL) {
1163         // Already destroyed.
1164         goto done;
1165     }
1166     ref->objcount -= 1;
1167 
1168     // Destroy if no longer used.
1169     if (ref->objcount == 0) {
1170         _PyChannelState *chan = NULL;
1171         _channels_remove_ref(channels, ref, prev, &chan);
1172         if (chan != NULL) {
1173             _channel_free(chan);
1174         }
1175     }
1176 
1177 done:
1178     PyThread_release_lock(channels->mutex);
1179 }
1180 
1181 static int64_t *
_channels_list_all(_channels * channels,int64_t * count)1182 _channels_list_all(_channels *channels, int64_t *count)
1183 {
1184     int64_t *cids = NULL;
1185     PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1186     int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(channels->numopen));
1187     if (ids == NULL) {
1188         goto done;
1189     }
1190     _channelref *ref = channels->head;
1191     for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
1192         ids[i] = ref->id;
1193     }
1194     *count = channels->numopen;
1195 
1196     cids = ids;
1197 done:
1198     PyThread_release_lock(channels->mutex);
1199     return cids;
1200 }
1201 
1202 /* support for closing non-empty channels */
1203 
1204 struct _channel_closing {
1205     struct _channelref *ref;
1206 };
1207 
1208 static int
_channel_set_closing(struct _channelref * ref,PyThread_type_lock mutex)1209 _channel_set_closing(struct _channelref *ref, PyThread_type_lock mutex) {
1210     struct _channel *chan = ref->chan;
1211     if (chan == NULL) {
1212         // already closed
1213         return 0;
1214     }
1215     int res = -1;
1216     PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
1217     if (chan->closing != NULL) {
1218         PyErr_SetString(ChannelClosedError, "channel closed");
1219         goto done;
1220     }
1221     chan->closing = PyMem_NEW(struct _channel_closing, 1);
1222     if (chan->closing == NULL) {
1223         goto done;
1224     }
1225     chan->closing->ref = ref;
1226 
1227     res = 0;
1228 done:
1229     PyThread_release_lock(chan->mutex);
1230     return res;
1231 }
1232 
1233 static void
_channel_clear_closing(struct _channel * chan)1234 _channel_clear_closing(struct _channel *chan) {
1235     PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
1236     if (chan->closing != NULL) {
1237         PyMem_Free(chan->closing);
1238         chan->closing = NULL;
1239     }
1240     PyThread_release_lock(chan->mutex);
1241 }
1242 
1243 static void
_channel_finish_closing(struct _channel * chan)1244 _channel_finish_closing(struct _channel *chan) {
1245     struct _channel_closing *closing = chan->closing;
1246     if (closing == NULL) {
1247         return;
1248     }
1249     _channelref *ref = closing->ref;
1250     _channel_clear_closing(chan);
1251     // Do the things that would have been done in _channels_close().
1252     ref->chan = NULL;
1253     _channel_free(chan);
1254 }
1255 
1256 /* "high"-level channel-related functions */
1257 
1258 static int64_t
_channel_create(_channels * channels)1259 _channel_create(_channels *channels)
1260 {
1261     _PyChannelState *chan = _channel_new();
1262     if (chan == NULL) {
1263         return -1;
1264     }
1265     int64_t id = _channels_add(channels, chan);
1266     if (id < 0) {
1267         _channel_free(chan);
1268         return -1;
1269     }
1270     return id;
1271 }
1272 
1273 static int
_channel_destroy(_channels * channels,int64_t id)1274 _channel_destroy(_channels *channels, int64_t id)
1275 {
1276     _PyChannelState *chan = NULL;
1277     if (_channels_remove(channels, id, &chan) != 0) {
1278         return -1;
1279     }
1280     if (chan != NULL) {
1281         _channel_free(chan);
1282     }
1283     return 0;
1284 }
1285 
1286 static int
_channel_send(_channels * channels,int64_t id,PyObject * obj)1287 _channel_send(_channels *channels, int64_t id, PyObject *obj)
1288 {
1289     PyInterpreterState *interp = _get_current();
1290     if (interp == NULL) {
1291         return -1;
1292     }
1293 
1294     // Look up the channel.
1295     PyThread_type_lock mutex = NULL;
1296     _PyChannelState *chan = _channels_lookup(channels, id, &mutex);
1297     if (chan == NULL) {
1298         return -1;
1299     }
1300     // Past this point we are responsible for releasing the mutex.
1301 
1302     if (chan->closing != NULL) {
1303         PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", id);
1304         PyThread_release_lock(mutex);
1305         return -1;
1306     }
1307 
1308     // Convert the object to cross-interpreter data.
1309     _PyCrossInterpreterData *data = PyMem_NEW(_PyCrossInterpreterData, 1);
1310     if (data == NULL) {
1311         PyThread_release_lock(mutex);
1312         return -1;
1313     }
1314     if (_PyObject_GetCrossInterpreterData(obj, data) != 0) {
1315         PyThread_release_lock(mutex);
1316         PyMem_Free(data);
1317         return -1;
1318     }
1319 
1320     // Add the data to the channel.
1321     int res = _channel_add(chan, PyInterpreterState_GetID(interp), data);
1322     PyThread_release_lock(mutex);
1323     if (res != 0) {
1324         _PyCrossInterpreterData_Release(data);
1325         PyMem_Free(data);
1326         return -1;
1327     }
1328 
1329     return 0;
1330 }
1331 
1332 static PyObject *
_channel_recv(_channels * channels,int64_t id)1333 _channel_recv(_channels *channels, int64_t id)
1334 {
1335     PyInterpreterState *interp = _get_current();
1336     if (interp == NULL) {
1337         return NULL;
1338     }
1339 
1340     // Look up the channel.
1341     PyThread_type_lock mutex = NULL;
1342     _PyChannelState *chan = _channels_lookup(channels, id, &mutex);
1343     if (chan == NULL) {
1344         return NULL;
1345     }
1346     // Past this point we are responsible for releasing the mutex.
1347 
1348     // Pop off the next item from the channel.
1349     _PyCrossInterpreterData *data = _channel_next(chan, PyInterpreterState_GetID(interp));
1350     PyThread_release_lock(mutex);
1351     if (data == NULL) {
1352         return NULL;
1353     }
1354 
1355     // Convert the data back to an object.
1356     PyObject *obj = _PyCrossInterpreterData_NewObject(data);
1357     _PyCrossInterpreterData_Release(data);
1358     PyMem_Free(data);
1359     if (obj == NULL) {
1360         return NULL;
1361     }
1362 
1363     return obj;
1364 }
1365 
1366 static int
_channel_drop(_channels * channels,int64_t id,int send,int recv)1367 _channel_drop(_channels *channels, int64_t id, int send, int recv)
1368 {
1369     PyInterpreterState *interp = _get_current();
1370     if (interp == NULL) {
1371         return -1;
1372     }
1373 
1374     // Look up the channel.
1375     PyThread_type_lock mutex = NULL;
1376     _PyChannelState *chan = _channels_lookup(channels, id, &mutex);
1377     if (chan == NULL) {
1378         return -1;
1379     }
1380     // Past this point we are responsible for releasing the mutex.
1381 
1382     // Close one or both of the two ends.
1383     int res = _channel_close_interpreter(chan, PyInterpreterState_GetID(interp), send-recv);
1384     PyThread_release_lock(mutex);
1385     return res;
1386 }
1387 
1388 static int
_channel_close(_channels * channels,int64_t id,int end,int force)1389 _channel_close(_channels *channels, int64_t id, int end, int force)
1390 {
1391     return _channels_close(channels, id, NULL, end, force);
1392 }
1393 
1394 static int
_channel_is_associated(_channels * channels,int64_t cid,int64_t interp,int send)1395 _channel_is_associated(_channels *channels, int64_t cid, int64_t interp,
1396                        int send)
1397 {
1398     _PyChannelState *chan = _channels_lookup(channels, cid, NULL);
1399     if (chan == NULL) {
1400         return -1;
1401     } else if (send && chan->closing != NULL) {
1402         PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", cid);
1403         return -1;
1404     }
1405 
1406     _channelend *end = _channelend_find(send ? chan->ends->send : chan->ends->recv,
1407                                         interp, NULL);
1408 
1409     return (end != NULL && end->open);
1410 }
1411 
1412 /* ChannelID class */
1413 
1414 static PyTypeObject ChannelIDtype;
1415 
1416 typedef struct channelid {
1417     PyObject_HEAD
1418     int64_t id;
1419     int end;
1420     int resolve;
1421     _channels *channels;
1422 } channelid;
1423 
1424 static int
channel_id_converter(PyObject * arg,void * ptr)1425 channel_id_converter(PyObject *arg, void *ptr)
1426 {
1427     int64_t cid;
1428     if (PyObject_TypeCheck(arg, &ChannelIDtype)) {
1429         cid = ((channelid *)arg)->id;
1430     }
1431     else if (PyIndex_Check(arg)) {
1432         cid = PyLong_AsLongLong(arg);
1433         if (cid == -1 && PyErr_Occurred()) {
1434             return 0;
1435         }
1436         if (cid < 0) {
1437             PyErr_Format(PyExc_ValueError,
1438                         "channel ID must be a non-negative int, got %R", arg);
1439             return 0;
1440         }
1441     }
1442     else {
1443         PyErr_Format(PyExc_TypeError,
1444                      "channel ID must be an int, got %.100s",
1445                      Py_TYPE(arg)->tp_name);
1446         return 0;
1447     }
1448     *(int64_t *)ptr = cid;
1449     return 1;
1450 }
1451 
1452 static channelid *
newchannelid(PyTypeObject * cls,int64_t cid,int end,_channels * channels,int force,int resolve)1453 newchannelid(PyTypeObject *cls, int64_t cid, int end, _channels *channels,
1454              int force, int resolve)
1455 {
1456     channelid *self = PyObject_New(channelid, cls);
1457     if (self == NULL) {
1458         return NULL;
1459     }
1460     self->id = cid;
1461     self->end = end;
1462     self->resolve = resolve;
1463     self->channels = channels;
1464 
1465     if (_channels_add_id_object(channels, cid) != 0) {
1466         if (force && PyErr_ExceptionMatches(ChannelNotFoundError)) {
1467             PyErr_Clear();
1468         }
1469         else {
1470             Py_DECREF((PyObject *)self);
1471             return NULL;
1472         }
1473     }
1474 
1475     return self;
1476 }
1477 
1478 static _channels * _global_channels(void);
1479 
1480 static PyObject *
channelid_new(PyTypeObject * cls,PyObject * args,PyObject * kwds)1481 channelid_new(PyTypeObject *cls, PyObject *args, PyObject *kwds)
1482 {
1483     static char *kwlist[] = {"id", "send", "recv", "force", "_resolve", NULL};
1484     int64_t cid;
1485     int send = -1;
1486     int recv = -1;
1487     int force = 0;
1488     int resolve = 0;
1489     if (!PyArg_ParseTupleAndKeywords(args, kwds,
1490                                      "O&|$pppp:ChannelID.__new__", kwlist,
1491                                      channel_id_converter, &cid, &send, &recv, &force, &resolve))
1492         return NULL;
1493 
1494     // Handle "send" and "recv".
1495     if (send == 0 && recv == 0) {
1496         PyErr_SetString(PyExc_ValueError,
1497                         "'send' and 'recv' cannot both be False");
1498         return NULL;
1499     }
1500 
1501     int end = 0;
1502     if (send == 1) {
1503         if (recv == 0 || recv == -1) {
1504             end = CHANNEL_SEND;
1505         }
1506     }
1507     else if (recv == 1) {
1508         end = CHANNEL_RECV;
1509     }
1510 
1511     return (PyObject *)newchannelid(cls, cid, end, _global_channels(),
1512                                     force, resolve);
1513 }
1514 
1515 static void
channelid_dealloc(PyObject * v)1516 channelid_dealloc(PyObject *v)
1517 {
1518     int64_t cid = ((channelid *)v)->id;
1519     _channels *channels = ((channelid *)v)->channels;
1520     Py_TYPE(v)->tp_free(v);
1521 
1522     _channels_drop_id_object(channels, cid);
1523 }
1524 
1525 static PyObject *
channelid_repr(PyObject * self)1526 channelid_repr(PyObject *self)
1527 {
1528     PyTypeObject *type = Py_TYPE(self);
1529     const char *name = _PyType_Name(type);
1530 
1531     channelid *cid = (channelid *)self;
1532     const char *fmt;
1533     if (cid->end == CHANNEL_SEND) {
1534         fmt = "%s(%" PRId64 ", send=True)";
1535     }
1536     else if (cid->end == CHANNEL_RECV) {
1537         fmt = "%s(%" PRId64 ", recv=True)";
1538     }
1539     else {
1540         fmt = "%s(%" PRId64 ")";
1541     }
1542     return PyUnicode_FromFormat(fmt, name, cid->id);
1543 }
1544 
1545 static PyObject *
channelid_str(PyObject * self)1546 channelid_str(PyObject *self)
1547 {
1548     channelid *cid = (channelid *)self;
1549     return PyUnicode_FromFormat("%" PRId64 "", cid->id);
1550 }
1551 
1552 static PyObject *
channelid_int(PyObject * self)1553 channelid_int(PyObject *self)
1554 {
1555     channelid *cid = (channelid *)self;
1556     return PyLong_FromLongLong(cid->id);
1557 }
1558 
1559 static PyNumberMethods channelid_as_number = {
1560      0,                        /* nb_add */
1561      0,                        /* nb_subtract */
1562      0,                        /* nb_multiply */
1563      0,                        /* nb_remainder */
1564      0,                        /* nb_divmod */
1565      0,                        /* nb_power */
1566      0,                        /* nb_negative */
1567      0,                        /* nb_positive */
1568      0,                        /* nb_absolute */
1569      0,                        /* nb_bool */
1570      0,                        /* nb_invert */
1571      0,                        /* nb_lshift */
1572      0,                        /* nb_rshift */
1573      0,                        /* nb_and */
1574      0,                        /* nb_xor */
1575      0,                        /* nb_or */
1576      (unaryfunc)channelid_int, /* nb_int */
1577      0,                        /* nb_reserved */
1578      0,                        /* nb_float */
1579 
1580      0,                        /* nb_inplace_add */
1581      0,                        /* nb_inplace_subtract */
1582      0,                        /* nb_inplace_multiply */
1583      0,                        /* nb_inplace_remainder */
1584      0,                        /* nb_inplace_power */
1585      0,                        /* nb_inplace_lshift */
1586      0,                        /* nb_inplace_rshift */
1587      0,                        /* nb_inplace_and */
1588      0,                        /* nb_inplace_xor */
1589      0,                        /* nb_inplace_or */
1590 
1591      0,                        /* nb_floor_divide */
1592      0,                        /* nb_true_divide */
1593      0,                        /* nb_inplace_floor_divide */
1594      0,                        /* nb_inplace_true_divide */
1595 
1596      (unaryfunc)channelid_int, /* nb_index */
1597 };
1598 
1599 static Py_hash_t
channelid_hash(PyObject * self)1600 channelid_hash(PyObject *self)
1601 {
1602     channelid *cid = (channelid *)self;
1603     PyObject *id = PyLong_FromLongLong(cid->id);
1604     if (id == NULL) {
1605         return -1;
1606     }
1607     Py_hash_t hash = PyObject_Hash(id);
1608     Py_DECREF(id);
1609     return hash;
1610 }
1611 
1612 static PyObject *
channelid_richcompare(PyObject * self,PyObject * other,int op)1613 channelid_richcompare(PyObject *self, PyObject *other, int op)
1614 {
1615     if (op != Py_EQ && op != Py_NE) {
1616         Py_RETURN_NOTIMPLEMENTED;
1617     }
1618 
1619     if (!PyObject_TypeCheck(self, &ChannelIDtype)) {
1620         Py_RETURN_NOTIMPLEMENTED;
1621     }
1622 
1623     channelid *cid = (channelid *)self;
1624     int equal;
1625     if (PyObject_TypeCheck(other, &ChannelIDtype)) {
1626         channelid *othercid = (channelid *)other;
1627         equal = (cid->end == othercid->end) && (cid->id == othercid->id);
1628     }
1629     else if (PyLong_Check(other)) {
1630         /* Fast path */
1631         int overflow;
1632         long long othercid = PyLong_AsLongLongAndOverflow(other, &overflow);
1633         if (othercid == -1 && PyErr_Occurred()) {
1634             return NULL;
1635         }
1636         equal = !overflow && (othercid >= 0) && (cid->id == othercid);
1637     }
1638     else if (PyNumber_Check(other)) {
1639         PyObject *pyid = PyLong_FromLongLong(cid->id);
1640         if (pyid == NULL) {
1641             return NULL;
1642         }
1643         PyObject *res = PyObject_RichCompare(pyid, other, op);
1644         Py_DECREF(pyid);
1645         return res;
1646     }
1647     else {
1648         Py_RETURN_NOTIMPLEMENTED;
1649     }
1650 
1651     if ((op == Py_EQ && equal) || (op == Py_NE && !equal)) {
1652         Py_RETURN_TRUE;
1653     }
1654     Py_RETURN_FALSE;
1655 }
1656 
1657 static PyObject *
_channel_from_cid(PyObject * cid,int end)1658 _channel_from_cid(PyObject *cid, int end)
1659 {
1660     PyObject *highlevel = PyImport_ImportModule("interpreters");
1661     if (highlevel == NULL) {
1662         PyErr_Clear();
1663         highlevel = PyImport_ImportModule("test.support.interpreters");
1664         if (highlevel == NULL) {
1665             return NULL;
1666         }
1667     }
1668     const char *clsname = (end == CHANNEL_RECV) ? "RecvChannel" :
1669                                                   "SendChannel";
1670     PyObject *cls = PyObject_GetAttrString(highlevel, clsname);
1671     Py_DECREF(highlevel);
1672     if (cls == NULL) {
1673         return NULL;
1674     }
1675     PyObject *chan = PyObject_CallFunctionObjArgs(cls, cid, NULL);
1676     Py_DECREF(cls);
1677     if (chan == NULL) {
1678         return NULL;
1679     }
1680     return chan;
1681 }
1682 
1683 struct _channelid_xid {
1684     int64_t id;
1685     int end;
1686     int resolve;
1687 };
1688 
1689 static PyObject *
_channelid_from_xid(_PyCrossInterpreterData * data)1690 _channelid_from_xid(_PyCrossInterpreterData *data)
1691 {
1692     struct _channelid_xid *xid = (struct _channelid_xid *)data->data;
1693     // Note that we do not preserve the "resolve" flag.
1694     PyObject *cid = (PyObject *)newchannelid(&ChannelIDtype, xid->id, xid->end,
1695                                              _global_channels(), 0, 0);
1696     if (xid->end == 0) {
1697         return cid;
1698     }
1699     if (!xid->resolve) {
1700         return cid;
1701     }
1702 
1703     /* Try returning a high-level channel end but fall back to the ID. */
1704     PyObject *chan = _channel_from_cid(cid, xid->end);
1705     if (chan == NULL) {
1706         PyErr_Clear();
1707         return cid;
1708     }
1709     Py_DECREF(cid);
1710     return chan;
1711 }
1712 
1713 static int
_channelid_shared(PyObject * obj,_PyCrossInterpreterData * data)1714 _channelid_shared(PyObject *obj, _PyCrossInterpreterData *data)
1715 {
1716     struct _channelid_xid *xid = PyMem_NEW(struct _channelid_xid, 1);
1717     if (xid == NULL) {
1718         return -1;
1719     }
1720     xid->id = ((channelid *)obj)->id;
1721     xid->end = ((channelid *)obj)->end;
1722     xid->resolve = ((channelid *)obj)->resolve;
1723 
1724     data->data = xid;
1725     Py_INCREF(obj);
1726     data->obj = obj;
1727     data->new_object = _channelid_from_xid;
1728     data->free = PyMem_Free;
1729     return 0;
1730 }
1731 
1732 static PyObject *
channelid_end(PyObject * self,void * end)1733 channelid_end(PyObject *self, void *end)
1734 {
1735     int force = 1;
1736     channelid *cid = (channelid *)self;
1737     if (end != NULL) {
1738         return (PyObject *)newchannelid(Py_TYPE(self), cid->id, *(int *)end,
1739                                         cid->channels, force, cid->resolve);
1740     }
1741 
1742     if (cid->end == CHANNEL_SEND) {
1743         return PyUnicode_InternFromString("send");
1744     }
1745     if (cid->end == CHANNEL_RECV) {
1746         return PyUnicode_InternFromString("recv");
1747     }
1748     return PyUnicode_InternFromString("both");
1749 }
1750 
1751 static int _channelid_end_send = CHANNEL_SEND;
1752 static int _channelid_end_recv = CHANNEL_RECV;
1753 
1754 static PyGetSetDef channelid_getsets[] = {
1755     {"end", (getter)channelid_end, NULL,
1756      PyDoc_STR("'send', 'recv', or 'both'")},
1757     {"send", (getter)channelid_end, NULL,
1758      PyDoc_STR("the 'send' end of the channel"), &_channelid_end_send},
1759     {"recv", (getter)channelid_end, NULL,
1760      PyDoc_STR("the 'recv' end of the channel"), &_channelid_end_recv},
1761     {NULL}
1762 };
1763 
1764 PyDoc_STRVAR(channelid_doc,
1765 "A channel ID identifies a channel and may be used as an int.");
1766 
1767 static PyTypeObject ChannelIDtype = {
1768     PyVarObject_HEAD_INIT(&PyType_Type, 0)
1769     "_xxsubinterpreters.ChannelID", /* tp_name */
1770     sizeof(channelid),              /* tp_basicsize */
1771     0,                              /* tp_itemsize */
1772     (destructor)channelid_dealloc,  /* tp_dealloc */
1773     0,                              /* tp_vectorcall_offset */
1774     0,                              /* tp_getattr */
1775     0,                              /* tp_setattr */
1776     0,                              /* tp_as_async */
1777     (reprfunc)channelid_repr,       /* tp_repr */
1778     &channelid_as_number,           /* tp_as_number */
1779     0,                              /* tp_as_sequence */
1780     0,                              /* tp_as_mapping */
1781     channelid_hash,                 /* tp_hash */
1782     0,                              /* tp_call */
1783     (reprfunc)channelid_str,        /* tp_str */
1784     0,                              /* tp_getattro */
1785     0,                              /* tp_setattro */
1786     0,                              /* tp_as_buffer */
1787     // Use Py_TPFLAGS_DISALLOW_INSTANTIATION so the type cannot be instantiated
1788     // from Python code.  We do this because there is a strong relationship
1789     // between channel IDs and the channel lifecycle, so this limitation avoids
1790     // related complications. Use the _channel_id() function instead.
1791     Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE
1792         | Py_TPFLAGS_DISALLOW_INSTANTIATION, /* tp_flags */
1793     channelid_doc,                  /* tp_doc */
1794     0,                              /* tp_traverse */
1795     0,                              /* tp_clear */
1796     channelid_richcompare,          /* tp_richcompare */
1797     0,                              /* tp_weaklistoffset */
1798     0,                              /* tp_iter */
1799     0,                              /* tp_iternext */
1800     0,                              /* tp_methods */
1801     0,                              /* tp_members */
1802     channelid_getsets,              /* tp_getset */
1803 };
1804 
1805 
1806 /* interpreter-specific code ************************************************/
1807 
1808 static PyObject * RunFailedError = NULL;
1809 
1810 static int
interp_exceptions_init(PyObject * ns)1811 interp_exceptions_init(PyObject *ns)
1812 {
1813     // XXX Move the exceptions into per-module memory?
1814 
1815     if (RunFailedError == NULL) {
1816         // An uncaught exception came out of interp_run_string().
1817         RunFailedError = PyErr_NewException("_xxsubinterpreters.RunFailedError",
1818                                             PyExc_RuntimeError, NULL);
1819         if (RunFailedError == NULL) {
1820             return -1;
1821         }
1822         if (PyDict_SetItemString(ns, "RunFailedError", RunFailedError) != 0) {
1823             return -1;
1824         }
1825     }
1826 
1827     return 0;
1828 }
1829 
1830 static int
_is_running(PyInterpreterState * interp)1831 _is_running(PyInterpreterState *interp)
1832 {
1833     PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
1834     if (PyThreadState_Next(tstate) != NULL) {
1835         PyErr_SetString(PyExc_RuntimeError,
1836                         "interpreter has more than one thread");
1837         return -1;
1838     }
1839 
1840     assert(!PyErr_Occurred());
1841     _PyInterpreterFrame *frame = tstate->cframe->current_frame;
1842     if (frame == NULL) {
1843         return 0;
1844     }
1845     return 1;
1846 }
1847 
1848 static int
_ensure_not_running(PyInterpreterState * interp)1849 _ensure_not_running(PyInterpreterState *interp)
1850 {
1851     int is_running = _is_running(interp);
1852     if (is_running < 0) {
1853         return -1;
1854     }
1855     if (is_running) {
1856         PyErr_Format(PyExc_RuntimeError, "interpreter already running");
1857         return -1;
1858     }
1859     return 0;
1860 }
1861 
1862 static int
_run_script(PyInterpreterState * interp,const char * codestr,_sharedns * shared,_sharedexception ** exc)1863 _run_script(PyInterpreterState *interp, const char *codestr,
1864             _sharedns *shared, _sharedexception **exc)
1865 {
1866     PyObject *exctype = NULL;
1867     PyObject *excval = NULL;
1868     PyObject *tb = NULL;
1869 
1870     PyObject *main_mod = _PyInterpreterState_GetMainModule(interp);
1871     if (main_mod == NULL) {
1872         goto error;
1873     }
1874     PyObject *ns = PyModule_GetDict(main_mod);  // borrowed
1875     Py_DECREF(main_mod);
1876     if (ns == NULL) {
1877         goto error;
1878     }
1879     Py_INCREF(ns);
1880 
1881     // Apply the cross-interpreter data.
1882     if (shared != NULL) {
1883         if (_sharedns_apply(shared, ns) != 0) {
1884             Py_DECREF(ns);
1885             goto error;
1886         }
1887     }
1888 
1889     // Run the string (see PyRun_SimpleStringFlags).
1890     PyObject *result = PyRun_StringFlags(codestr, Py_file_input, ns, ns, NULL);
1891     Py_DECREF(ns);
1892     if (result == NULL) {
1893         goto error;
1894     }
1895     else {
1896         Py_DECREF(result);  // We throw away the result.
1897     }
1898 
1899     *exc = NULL;
1900     return 0;
1901 
1902 error:
1903     PyErr_Fetch(&exctype, &excval, &tb);
1904 
1905     _sharedexception *sharedexc = _sharedexception_bind(exctype, excval, tb);
1906     Py_XDECREF(exctype);
1907     Py_XDECREF(excval);
1908     Py_XDECREF(tb);
1909     if (sharedexc == NULL) {
1910         fprintf(stderr, "RunFailedError: script raised an uncaught exception");
1911         PyErr_Clear();
1912         sharedexc = NULL;
1913     }
1914     else {
1915         assert(!PyErr_Occurred());
1916     }
1917     *exc = sharedexc;
1918     return -1;
1919 }
1920 
1921 static int
_run_script_in_interpreter(PyInterpreterState * interp,const char * codestr,PyObject * shareables)1922 _run_script_in_interpreter(PyInterpreterState *interp, const char *codestr,
1923                            PyObject *shareables)
1924 {
1925     if (_ensure_not_running(interp) < 0) {
1926         return -1;
1927     }
1928 
1929     _sharedns *shared = _get_shared_ns(shareables);
1930     if (shared == NULL && PyErr_Occurred()) {
1931         return -1;
1932     }
1933 
1934     // Switch to interpreter.
1935     PyThreadState *save_tstate = NULL;
1936     if (interp != PyInterpreterState_Get()) {
1937         // XXX Using the "head" thread isn't strictly correct.
1938         PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
1939         // XXX Possible GILState issues?
1940         save_tstate = PyThreadState_Swap(tstate);
1941     }
1942 
1943     // Run the script.
1944     _sharedexception *exc = NULL;
1945     int result = _run_script(interp, codestr, shared, &exc);
1946 
1947     // Switch back.
1948     if (save_tstate != NULL) {
1949         PyThreadState_Swap(save_tstate);
1950     }
1951 
1952     // Propagate any exception out to the caller.
1953     if (exc != NULL) {
1954         _sharedexception_apply(exc, RunFailedError);
1955         _sharedexception_free(exc);
1956     }
1957     else if (result != 0) {
1958         // We were unable to allocate a shared exception.
1959         PyErr_NoMemory();
1960     }
1961 
1962     if (shared != NULL) {
1963         _sharedns_free(shared);
1964     }
1965 
1966     return result;
1967 }
1968 
1969 
1970 /* module level code ********************************************************/
1971 
1972 /* globals is the process-global state for the module.  It holds all
1973    the data that we need to share between interpreters, so it cannot
1974    hold PyObject values. */
1975 static struct globals {
1976     _channels channels;
1977 } _globals = {{0}};
1978 
1979 static int
_init_globals(void)1980 _init_globals(void)
1981 {
1982     if (_channels_init(&_globals.channels) != 0) {
1983         return -1;
1984     }
1985     return 0;
1986 }
1987 
1988 static _channels *
_global_channels(void)1989 _global_channels(void) {
1990     return &_globals.channels;
1991 }
1992 
1993 static PyObject *
interp_create(PyObject * self,PyObject * args,PyObject * kwds)1994 interp_create(PyObject *self, PyObject *args, PyObject *kwds)
1995 {
1996 
1997     static char *kwlist[] = {"isolated", NULL};
1998     int isolated = 1;
1999     if (!PyArg_ParseTupleAndKeywords(args, kwds, "|$i:create", kwlist,
2000                                      &isolated)) {
2001         return NULL;
2002     }
2003 
2004     // Create and initialize the new interpreter.
2005     PyThreadState *save_tstate = _PyThreadState_GET();
2006     // XXX Possible GILState issues?
2007     PyThreadState *tstate = _Py_NewInterpreter(isolated);
2008     PyThreadState_Swap(save_tstate);
2009     if (tstate == NULL) {
2010         /* Since no new thread state was created, there is no exception to
2011            propagate; raise a fresh one after swapping in the old thread
2012            state. */
2013         PyErr_SetString(PyExc_RuntimeError, "interpreter creation failed");
2014         return NULL;
2015     }
2016     PyInterpreterState *interp = PyThreadState_GetInterpreter(tstate);
2017     PyObject *idobj = _PyInterpreterState_GetIDObject(interp);
2018     if (idobj == NULL) {
2019         // XXX Possible GILState issues?
2020         save_tstate = PyThreadState_Swap(tstate);
2021         Py_EndInterpreter(tstate);
2022         PyThreadState_Swap(save_tstate);
2023         return NULL;
2024     }
2025     _PyInterpreterState_RequireIDRef(interp, 1);
2026     return idobj;
2027 }
2028 
2029 PyDoc_STRVAR(create_doc,
2030 "create() -> ID\n\
2031 \n\
2032 Create a new interpreter and return a unique generated ID.");
2033 
2034 
2035 static PyObject *
interp_destroy(PyObject * self,PyObject * args,PyObject * kwds)2036 interp_destroy(PyObject *self, PyObject *args, PyObject *kwds)
2037 {
2038     static char *kwlist[] = {"id", NULL};
2039     PyObject *id;
2040     // XXX Use "L" for id?
2041     if (!PyArg_ParseTupleAndKeywords(args, kwds,
2042                                      "O:destroy", kwlist, &id)) {
2043         return NULL;
2044     }
2045 
2046     // Look up the interpreter.
2047     PyInterpreterState *interp = _PyInterpreterID_LookUp(id);
2048     if (interp == NULL) {
2049         return NULL;
2050     }
2051 
2052     // Ensure we don't try to destroy the current interpreter.
2053     PyInterpreterState *current = _get_current();
2054     if (current == NULL) {
2055         return NULL;
2056     }
2057     if (interp == current) {
2058         PyErr_SetString(PyExc_RuntimeError,
2059                         "cannot destroy the current interpreter");
2060         return NULL;
2061     }
2062 
2063     // Ensure the interpreter isn't running.
2064     /* XXX We *could* support destroying a running interpreter but
2065        aren't going to worry about it for now. */
2066     if (_ensure_not_running(interp) < 0) {
2067         return NULL;
2068     }
2069 
2070     // Destroy the interpreter.
2071     PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
2072     // XXX Possible GILState issues?
2073     PyThreadState *save_tstate = PyThreadState_Swap(tstate);
2074     Py_EndInterpreter(tstate);
2075     PyThreadState_Swap(save_tstate);
2076 
2077     Py_RETURN_NONE;
2078 }
2079 
2080 PyDoc_STRVAR(destroy_doc,
2081 "destroy(id)\n\
2082 \n\
2083 Destroy the identified interpreter.\n\
2084 \n\
2085 Attempting to destroy the current interpreter results in a RuntimeError.\n\
2086 So does an unrecognized ID.");
2087 
2088 
2089 static PyObject *
interp_list_all(PyObject * self,PyObject * Py_UNUSED (ignored))2090 interp_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
2091 {
2092     PyObject *ids, *id;
2093     PyInterpreterState *interp;
2094 
2095     ids = PyList_New(0);
2096     if (ids == NULL) {
2097         return NULL;
2098     }
2099 
2100     interp = PyInterpreterState_Head();
2101     while (interp != NULL) {
2102         id = _PyInterpreterState_GetIDObject(interp);
2103         if (id == NULL) {
2104             Py_DECREF(ids);
2105             return NULL;
2106         }
2107         // insert at front of list
2108         int res = PyList_Insert(ids, 0, id);
2109         Py_DECREF(id);
2110         if (res < 0) {
2111             Py_DECREF(ids);
2112             return NULL;
2113         }
2114 
2115         interp = PyInterpreterState_Next(interp);
2116     }
2117 
2118     return ids;
2119 }
2120 
2121 PyDoc_STRVAR(list_all_doc,
2122 "list_all() -> [ID]\n\
2123 \n\
2124 Return a list containing the ID of every existing interpreter.");
2125 
2126 
2127 static PyObject *
interp_get_current(PyObject * self,PyObject * Py_UNUSED (ignored))2128 interp_get_current(PyObject *self, PyObject *Py_UNUSED(ignored))
2129 {
2130     PyInterpreterState *interp =_get_current();
2131     if (interp == NULL) {
2132         return NULL;
2133     }
2134     return _PyInterpreterState_GetIDObject(interp);
2135 }
2136 
2137 PyDoc_STRVAR(get_current_doc,
2138 "get_current() -> ID\n\
2139 \n\
2140 Return the ID of current interpreter.");
2141 
2142 
2143 static PyObject *
interp_get_main(PyObject * self,PyObject * Py_UNUSED (ignored))2144 interp_get_main(PyObject *self, PyObject *Py_UNUSED(ignored))
2145 {
2146     // Currently, 0 is always the main interpreter.
2147     int64_t id = 0;
2148     return _PyInterpreterID_New(id);
2149 }
2150 
2151 PyDoc_STRVAR(get_main_doc,
2152 "get_main() -> ID\n\
2153 \n\
2154 Return the ID of main interpreter.");
2155 
2156 
2157 static PyObject *
interp_run_string(PyObject * self,PyObject * args,PyObject * kwds)2158 interp_run_string(PyObject *self, PyObject *args, PyObject *kwds)
2159 {
2160     static char *kwlist[] = {"id", "script", "shared", NULL};
2161     PyObject *id, *code;
2162     PyObject *shared = NULL;
2163     if (!PyArg_ParseTupleAndKeywords(args, kwds,
2164                                      "OU|O:run_string", kwlist,
2165                                      &id, &code, &shared)) {
2166         return NULL;
2167     }
2168 
2169     // Look up the interpreter.
2170     PyInterpreterState *interp = _PyInterpreterID_LookUp(id);
2171     if (interp == NULL) {
2172         return NULL;
2173     }
2174 
2175     // Extract code.
2176     Py_ssize_t size;
2177     const char *codestr = PyUnicode_AsUTF8AndSize(code, &size);
2178     if (codestr == NULL) {
2179         return NULL;
2180     }
2181     if (strlen(codestr) != (size_t)size) {
2182         PyErr_SetString(PyExc_ValueError,
2183                         "source code string cannot contain null bytes");
2184         return NULL;
2185     }
2186 
2187     // Run the code in the interpreter.
2188     if (_run_script_in_interpreter(interp, codestr, shared) != 0) {
2189         return NULL;
2190     }
2191     Py_RETURN_NONE;
2192 }
2193 
2194 PyDoc_STRVAR(run_string_doc,
2195 "run_string(id, script, shared)\n\
2196 \n\
2197 Execute the provided string in the identified interpreter.\n\
2198 \n\
2199 See PyRun_SimpleStrings.");
2200 
2201 
2202 static PyObject *
object_is_shareable(PyObject * self,PyObject * args,PyObject * kwds)2203 object_is_shareable(PyObject *self, PyObject *args, PyObject *kwds)
2204 {
2205     static char *kwlist[] = {"obj", NULL};
2206     PyObject *obj;
2207     if (!PyArg_ParseTupleAndKeywords(args, kwds,
2208                                      "O:is_shareable", kwlist, &obj)) {
2209         return NULL;
2210     }
2211 
2212     if (_PyObject_CheckCrossInterpreterData(obj) == 0) {
2213         Py_RETURN_TRUE;
2214     }
2215     PyErr_Clear();
2216     Py_RETURN_FALSE;
2217 }
2218 
2219 PyDoc_STRVAR(is_shareable_doc,
2220 "is_shareable(obj) -> bool\n\
2221 \n\
2222 Return True if the object's data may be shared between interpreters and\n\
2223 False otherwise.");
2224 
2225 
2226 static PyObject *
interp_is_running(PyObject * self,PyObject * args,PyObject * kwds)2227 interp_is_running(PyObject *self, PyObject *args, PyObject *kwds)
2228 {
2229     static char *kwlist[] = {"id", NULL};
2230     PyObject *id;
2231     if (!PyArg_ParseTupleAndKeywords(args, kwds,
2232                                      "O:is_running", kwlist, &id)) {
2233         return NULL;
2234     }
2235 
2236     PyInterpreterState *interp = _PyInterpreterID_LookUp(id);
2237     if (interp == NULL) {
2238         return NULL;
2239     }
2240     int is_running = _is_running(interp);
2241     if (is_running < 0) {
2242         return NULL;
2243     }
2244     if (is_running) {
2245         Py_RETURN_TRUE;
2246     }
2247     Py_RETURN_FALSE;
2248 }
2249 
2250 PyDoc_STRVAR(is_running_doc,
2251 "is_running(id) -> bool\n\
2252 \n\
2253 Return whether or not the identified interpreter is running.");
2254 
2255 static PyObject *
channel_create(PyObject * self,PyObject * Py_UNUSED (ignored))2256 channel_create(PyObject *self, PyObject *Py_UNUSED(ignored))
2257 {
2258     int64_t cid = _channel_create(&_globals.channels);
2259     if (cid < 0) {
2260         return NULL;
2261     }
2262     PyObject *id = (PyObject *)newchannelid(&ChannelIDtype, cid, 0,
2263                                             &_globals.channels, 0, 0);
2264     if (id == NULL) {
2265         if (_channel_destroy(&_globals.channels, cid) != 0) {
2266             // XXX issue a warning?
2267         }
2268         return NULL;
2269     }
2270     assert(((channelid *)id)->channels != NULL);
2271     return id;
2272 }
2273 
2274 PyDoc_STRVAR(channel_create_doc,
2275 "channel_create() -> cid\n\
2276 \n\
2277 Create a new cross-interpreter channel and return a unique generated ID.");
2278 
2279 static PyObject *
channel_destroy(PyObject * self,PyObject * args,PyObject * kwds)2280 channel_destroy(PyObject *self, PyObject *args, PyObject *kwds)
2281 {
2282     static char *kwlist[] = {"cid", NULL};
2283     int64_t cid;
2284     if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:channel_destroy", kwlist,
2285                                      channel_id_converter, &cid)) {
2286         return NULL;
2287     }
2288 
2289     if (_channel_destroy(&_globals.channels, cid) != 0) {
2290         return NULL;
2291     }
2292     Py_RETURN_NONE;
2293 }
2294 
2295 PyDoc_STRVAR(channel_destroy_doc,
2296 "channel_destroy(cid)\n\
2297 \n\
2298 Close and finalize the channel.  Afterward attempts to use the channel\n\
2299 will behave as though it never existed.");
2300 
2301 static PyObject *
channel_list_all(PyObject * self,PyObject * Py_UNUSED (ignored))2302 channel_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
2303 {
2304     int64_t count = 0;
2305     int64_t *cids = _channels_list_all(&_globals.channels, &count);
2306     if (cids == NULL) {
2307         if (count == 0) {
2308             return PyList_New(0);
2309         }
2310         return NULL;
2311     }
2312     PyObject *ids = PyList_New((Py_ssize_t)count);
2313     if (ids == NULL) {
2314         goto finally;
2315     }
2316     int64_t *cur = cids;
2317     for (int64_t i=0; i < count; cur++, i++) {
2318         PyObject *id = (PyObject *)newchannelid(&ChannelIDtype, *cur, 0,
2319                                                 &_globals.channels, 0, 0);
2320         if (id == NULL) {
2321             Py_DECREF(ids);
2322             ids = NULL;
2323             break;
2324         }
2325         PyList_SET_ITEM(ids, i, id);
2326     }
2327 
2328 finally:
2329     PyMem_Free(cids);
2330     return ids;
2331 }
2332 
2333 PyDoc_STRVAR(channel_list_all_doc,
2334 "channel_list_all() -> [cid]\n\
2335 \n\
2336 Return the list of all IDs for active channels.");
2337 
2338 static PyObject *
channel_list_interpreters(PyObject * self,PyObject * args,PyObject * kwds)2339 channel_list_interpreters(PyObject *self, PyObject *args, PyObject *kwds)
2340 {
2341     static char *kwlist[] = {"cid", "send", NULL};
2342     int64_t cid;            /* Channel ID */
2343     int send = 0;           /* Send or receive end? */
2344     int64_t id;
2345     PyObject *ids, *id_obj;
2346     PyInterpreterState *interp;
2347 
2348     if (!PyArg_ParseTupleAndKeywords(
2349             args, kwds, "O&$p:channel_list_interpreters",
2350             kwlist, channel_id_converter, &cid, &send)) {
2351         return NULL;
2352     }
2353 
2354     ids = PyList_New(0);
2355     if (ids == NULL) {
2356         goto except;
2357     }
2358 
2359     interp = PyInterpreterState_Head();
2360     while (interp != NULL) {
2361         id = PyInterpreterState_GetID(interp);
2362         assert(id >= 0);
2363         int res = _channel_is_associated(&_globals.channels, cid, id, send);
2364         if (res < 0) {
2365             goto except;
2366         }
2367         if (res) {
2368             id_obj = _PyInterpreterState_GetIDObject(interp);
2369             if (id_obj == NULL) {
2370                 goto except;
2371             }
2372             res = PyList_Insert(ids, 0, id_obj);
2373             Py_DECREF(id_obj);
2374             if (res < 0) {
2375                 goto except;
2376             }
2377         }
2378         interp = PyInterpreterState_Next(interp);
2379     }
2380 
2381     goto finally;
2382 
2383 except:
2384     Py_XDECREF(ids);
2385     ids = NULL;
2386 
2387 finally:
2388     return ids;
2389 }
2390 
2391 PyDoc_STRVAR(channel_list_interpreters_doc,
2392 "channel_list_interpreters(cid, *, send) -> [id]\n\
2393 \n\
2394 Return the list of all interpreter IDs associated with an end of the channel.\n\
2395 \n\
2396 The 'send' argument should be a boolean indicating whether to use the send or\n\
2397 receive end.");
2398 
2399 
2400 static PyObject *
channel_send(PyObject * self,PyObject * args,PyObject * kwds)2401 channel_send(PyObject *self, PyObject *args, PyObject *kwds)
2402 {
2403     static char *kwlist[] = {"cid", "obj", NULL};
2404     int64_t cid;
2405     PyObject *obj;
2406     if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O:channel_send", kwlist,
2407                                      channel_id_converter, &cid, &obj)) {
2408         return NULL;
2409     }
2410 
2411     if (_channel_send(&_globals.channels, cid, obj) != 0) {
2412         return NULL;
2413     }
2414     Py_RETURN_NONE;
2415 }
2416 
2417 PyDoc_STRVAR(channel_send_doc,
2418 "channel_send(cid, obj)\n\
2419 \n\
2420 Add the object's data to the channel's queue.");
2421 
2422 static PyObject *
channel_recv(PyObject * self,PyObject * args,PyObject * kwds)2423 channel_recv(PyObject *self, PyObject *args, PyObject *kwds)
2424 {
2425     static char *kwlist[] = {"cid", "default", NULL};
2426     int64_t cid;
2427     PyObject *dflt = NULL;
2428     if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&|O:channel_recv", kwlist,
2429                                      channel_id_converter, &cid, &dflt)) {
2430         return NULL;
2431     }
2432     Py_XINCREF(dflt);
2433 
2434     PyObject *obj = _channel_recv(&_globals.channels, cid);
2435     if (obj != NULL) {
2436         Py_XDECREF(dflt);
2437         return obj;
2438     } else if (PyErr_Occurred()) {
2439         Py_XDECREF(dflt);
2440         return NULL;
2441     } else if (dflt != NULL) {
2442         return dflt;
2443     } else {
2444         PyErr_Format(ChannelEmptyError, "channel %" PRId64 " is empty", cid);
2445         return NULL;
2446     }
2447 }
2448 
2449 PyDoc_STRVAR(channel_recv_doc,
2450 "channel_recv(cid, [default]) -> obj\n\
2451 \n\
2452 Return a new object from the data at the front of the channel's queue.\n\
2453 \n\
2454 If there is nothing to receive then raise ChannelEmptyError, unless\n\
2455 a default value is provided.  In that case return it.");
2456 
2457 static PyObject *
channel_close(PyObject * self,PyObject * args,PyObject * kwds)2458 channel_close(PyObject *self, PyObject *args, PyObject *kwds)
2459 {
2460     static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
2461     int64_t cid;
2462     int send = 0;
2463     int recv = 0;
2464     int force = 0;
2465     if (!PyArg_ParseTupleAndKeywords(args, kwds,
2466                                      "O&|$ppp:channel_close", kwlist,
2467                                      channel_id_converter, &cid, &send, &recv, &force)) {
2468         return NULL;
2469     }
2470 
2471     if (_channel_close(&_globals.channels, cid, send-recv, force) != 0) {
2472         return NULL;
2473     }
2474     Py_RETURN_NONE;
2475 }
2476 
2477 PyDoc_STRVAR(channel_close_doc,
2478 "channel_close(cid, *, send=None, recv=None, force=False)\n\
2479 \n\
2480 Close the channel for all interpreters.\n\
2481 \n\
2482 If the channel is empty then the keyword args are ignored and both\n\
2483 ends are immediately closed.  Otherwise, if 'force' is True then\n\
2484 all queued items are released and both ends are immediately\n\
2485 closed.\n\
2486 \n\
2487 If the channel is not empty *and* 'force' is False then following\n\
2488 happens:\n\
2489 \n\
2490  * recv is True (regardless of send):\n\
2491    - raise ChannelNotEmptyError\n\
2492  * recv is None and send is None:\n\
2493    - raise ChannelNotEmptyError\n\
2494  * send is True and recv is not True:\n\
2495    - fully close the 'send' end\n\
2496    - close the 'recv' end to interpreters not already receiving\n\
2497    - fully close it once empty\n\
2498 \n\
2499 Closing an already closed channel results in a ChannelClosedError.\n\
2500 \n\
2501 Once the channel's ID has no more ref counts in any interpreter\n\
2502 the channel will be destroyed.");
2503 
2504 static PyObject *
channel_release(PyObject * self,PyObject * args,PyObject * kwds)2505 channel_release(PyObject *self, PyObject *args, PyObject *kwds)
2506 {
2507     // Note that only the current interpreter is affected.
2508     static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
2509     int64_t cid;
2510     int send = 0;
2511     int recv = 0;
2512     int force = 0;
2513     if (!PyArg_ParseTupleAndKeywords(args, kwds,
2514                                      "O&|$ppp:channel_release", kwlist,
2515                                      channel_id_converter, &cid, &send, &recv, &force)) {
2516         return NULL;
2517     }
2518     if (send == 0 && recv == 0) {
2519         send = 1;
2520         recv = 1;
2521     }
2522 
2523     // XXX Handle force is True.
2524     // XXX Fix implicit release.
2525 
2526     if (_channel_drop(&_globals.channels, cid, send, recv) != 0) {
2527         return NULL;
2528     }
2529     Py_RETURN_NONE;
2530 }
2531 
2532 PyDoc_STRVAR(channel_release_doc,
2533 "channel_release(cid, *, send=None, recv=None, force=True)\n\
2534 \n\
2535 Close the channel for the current interpreter.  'send' and 'recv'\n\
2536 (bool) may be used to indicate the ends to close.  By default both\n\
2537 ends are closed.  Closing an already closed end is a noop.");
2538 
2539 static PyObject *
channel__channel_id(PyObject * self,PyObject * args,PyObject * kwds)2540 channel__channel_id(PyObject *self, PyObject *args, PyObject *kwds)
2541 {
2542     return channelid_new(&ChannelIDtype, args, kwds);
2543 }
2544 
2545 static PyMethodDef module_functions[] = {
2546     {"create",                    _PyCFunction_CAST(interp_create),
2547      METH_VARARGS | METH_KEYWORDS, create_doc},
2548     {"destroy",                   _PyCFunction_CAST(interp_destroy),
2549      METH_VARARGS | METH_KEYWORDS, destroy_doc},
2550     {"list_all",                  interp_list_all,
2551      METH_NOARGS, list_all_doc},
2552     {"get_current",               interp_get_current,
2553      METH_NOARGS, get_current_doc},
2554     {"get_main",                  interp_get_main,
2555      METH_NOARGS, get_main_doc},
2556     {"is_running",                _PyCFunction_CAST(interp_is_running),
2557      METH_VARARGS | METH_KEYWORDS, is_running_doc},
2558     {"run_string",                _PyCFunction_CAST(interp_run_string),
2559      METH_VARARGS | METH_KEYWORDS, run_string_doc},
2560 
2561     {"is_shareable",              _PyCFunction_CAST(object_is_shareable),
2562      METH_VARARGS | METH_KEYWORDS, is_shareable_doc},
2563 
2564     {"channel_create",            channel_create,
2565      METH_NOARGS, channel_create_doc},
2566     {"channel_destroy",           _PyCFunction_CAST(channel_destroy),
2567      METH_VARARGS | METH_KEYWORDS, channel_destroy_doc},
2568     {"channel_list_all",          channel_list_all,
2569      METH_NOARGS, channel_list_all_doc},
2570     {"channel_list_interpreters", _PyCFunction_CAST(channel_list_interpreters),
2571      METH_VARARGS | METH_KEYWORDS, channel_list_interpreters_doc},
2572     {"channel_send",              _PyCFunction_CAST(channel_send),
2573      METH_VARARGS | METH_KEYWORDS, channel_send_doc},
2574     {"channel_recv",              _PyCFunction_CAST(channel_recv),
2575      METH_VARARGS | METH_KEYWORDS, channel_recv_doc},
2576     {"channel_close",             _PyCFunction_CAST(channel_close),
2577      METH_VARARGS | METH_KEYWORDS, channel_close_doc},
2578     {"channel_release",           _PyCFunction_CAST(channel_release),
2579      METH_VARARGS | METH_KEYWORDS, channel_release_doc},
2580     {"_channel_id",               _PyCFunction_CAST(channel__channel_id),
2581      METH_VARARGS | METH_KEYWORDS, NULL},
2582 
2583     {NULL,                        NULL}           /* sentinel */
2584 };
2585 
2586 
2587 /* initialization function */
2588 
2589 PyDoc_STRVAR(module_doc,
2590 "This module provides primitive operations to manage Python interpreters.\n\
2591 The 'interpreters' module provides a more convenient interface.");
2592 
2593 static struct PyModuleDef interpretersmodule = {
2594     PyModuleDef_HEAD_INIT,
2595     "_xxsubinterpreters",  /* m_name */
2596     module_doc,            /* m_doc */
2597     -1,                    /* m_size */
2598     module_functions,      /* m_methods */
2599     NULL,                  /* m_slots */
2600     NULL,                  /* m_traverse */
2601     NULL,                  /* m_clear */
2602     NULL                   /* m_free */
2603 };
2604 
2605 
2606 PyMODINIT_FUNC
PyInit__xxsubinterpreters(void)2607 PyInit__xxsubinterpreters(void)
2608 {
2609     if (_init_globals() != 0) {
2610         return NULL;
2611     }
2612 
2613     /* Initialize types */
2614     if (PyType_Ready(&ChannelIDtype) != 0) {
2615         return NULL;
2616     }
2617 
2618     /* Create the module */
2619     PyObject *module = PyModule_Create(&interpretersmodule);
2620     if (module == NULL) {
2621         return NULL;
2622     }
2623 
2624     /* Add exception types */
2625     PyObject *ns = PyModule_GetDict(module);  // borrowed
2626     if (interp_exceptions_init(ns) != 0) {
2627         return NULL;
2628     }
2629     if (channel_exceptions_init(ns) != 0) {
2630         return NULL;
2631     }
2632 
2633     /* Add other types */
2634     Py_INCREF(&ChannelIDtype);
2635     if (PyDict_SetItemString(ns, "ChannelID", (PyObject *)&ChannelIDtype) != 0) {
2636         return NULL;
2637     }
2638     Py_INCREF(&_PyInterpreterID_Type);
2639     if (PyDict_SetItemString(ns, "InterpreterID", (PyObject *)&_PyInterpreterID_Type) != 0) {
2640         return NULL;
2641     }
2642 
2643     if (_PyCrossInterpreterData_RegisterClass(&ChannelIDtype, _channelid_shared)) {
2644         return NULL;
2645     }
2646 
2647     return module;
2648 }
2649