1
2 /* interpreters module */
3 /* low-level access to interpreter primitives */
4 #ifndef Py_BUILD_CORE_BUILTIN
5 # define Py_BUILD_CORE_MODULE 1
6 #endif
7
8 #include "Python.h"
9 #include "pycore_frame.h"
10 #include "pycore_pystate.h" // _PyThreadState_GET()
11 #include "pycore_interpreteridobject.h"
12
13
14 static char *
_copy_raw_string(PyObject * strobj)15 _copy_raw_string(PyObject *strobj)
16 {
17 const char *str = PyUnicode_AsUTF8(strobj);
18 if (str == NULL) {
19 return NULL;
20 }
21 char *copied = PyMem_Malloc(strlen(str)+1);
22 if (copied == NULL) {
23 PyErr_NoMemory();
24 return NULL;
25 }
26 strcpy(copied, str);
27 return copied;
28 }
29
30 static PyInterpreterState *
_get_current(void)31 _get_current(void)
32 {
33 // PyInterpreterState_Get() aborts if lookup fails, so don't need
34 // to check the result for NULL.
35 return PyInterpreterState_Get();
36 }
37
38
39 /* data-sharing-specific code ***********************************************/
40
41 struct _sharednsitem {
42 char *name;
43 _PyCrossInterpreterData data;
44 };
45
46 static void _sharednsitem_clear(struct _sharednsitem *); // forward
47
48 static int
_sharednsitem_init(struct _sharednsitem * item,PyObject * key,PyObject * value)49 _sharednsitem_init(struct _sharednsitem *item, PyObject *key, PyObject *value)
50 {
51 item->name = _copy_raw_string(key);
52 if (item->name == NULL) {
53 return -1;
54 }
55 if (_PyObject_GetCrossInterpreterData(value, &item->data) != 0) {
56 _sharednsitem_clear(item);
57 return -1;
58 }
59 return 0;
60 }
61
62 static void
_sharednsitem_clear(struct _sharednsitem * item)63 _sharednsitem_clear(struct _sharednsitem *item)
64 {
65 if (item->name != NULL) {
66 PyMem_Free(item->name);
67 item->name = NULL;
68 }
69 _PyCrossInterpreterData_Release(&item->data);
70 }
71
72 static int
_sharednsitem_apply(struct _sharednsitem * item,PyObject * ns)73 _sharednsitem_apply(struct _sharednsitem *item, PyObject *ns)
74 {
75 PyObject *name = PyUnicode_FromString(item->name);
76 if (name == NULL) {
77 return -1;
78 }
79 PyObject *value = _PyCrossInterpreterData_NewObject(&item->data);
80 if (value == NULL) {
81 Py_DECREF(name);
82 return -1;
83 }
84 int res = PyDict_SetItem(ns, name, value);
85 Py_DECREF(name);
86 Py_DECREF(value);
87 return res;
88 }
89
90 typedef struct _sharedns {
91 Py_ssize_t len;
92 struct _sharednsitem* items;
93 } _sharedns;
94
95 static _sharedns *
_sharedns_new(Py_ssize_t len)96 _sharedns_new(Py_ssize_t len)
97 {
98 _sharedns *shared = PyMem_NEW(_sharedns, 1);
99 if (shared == NULL) {
100 PyErr_NoMemory();
101 return NULL;
102 }
103 shared->len = len;
104 shared->items = PyMem_NEW(struct _sharednsitem, len);
105 if (shared->items == NULL) {
106 PyErr_NoMemory();
107 PyMem_Free(shared);
108 return NULL;
109 }
110 return shared;
111 }
112
113 static void
_sharedns_free(_sharedns * shared)114 _sharedns_free(_sharedns *shared)
115 {
116 for (Py_ssize_t i=0; i < shared->len; i++) {
117 _sharednsitem_clear(&shared->items[i]);
118 }
119 PyMem_Free(shared->items);
120 PyMem_Free(shared);
121 }
122
123 static _sharedns *
_get_shared_ns(PyObject * shareable)124 _get_shared_ns(PyObject *shareable)
125 {
126 if (shareable == NULL || shareable == Py_None) {
127 return NULL;
128 }
129 Py_ssize_t len = PyDict_Size(shareable);
130 if (len == 0) {
131 return NULL;
132 }
133
134 _sharedns *shared = _sharedns_new(len);
135 if (shared == NULL) {
136 return NULL;
137 }
138 Py_ssize_t pos = 0;
139 for (Py_ssize_t i=0; i < len; i++) {
140 PyObject *key, *value;
141 if (PyDict_Next(shareable, &pos, &key, &value) == 0) {
142 break;
143 }
144 if (_sharednsitem_init(&shared->items[i], key, value) != 0) {
145 break;
146 }
147 }
148 if (PyErr_Occurred()) {
149 _sharedns_free(shared);
150 return NULL;
151 }
152 return shared;
153 }
154
155 static int
_sharedns_apply(_sharedns * shared,PyObject * ns)156 _sharedns_apply(_sharedns *shared, PyObject *ns)
157 {
158 for (Py_ssize_t i=0; i < shared->len; i++) {
159 if (_sharednsitem_apply(&shared->items[i], ns) != 0) {
160 return -1;
161 }
162 }
163 return 0;
164 }
165
166 // Ultimately we'd like to preserve enough information about the
167 // exception and traceback that we could re-constitute (or at least
168 // simulate, a la traceback.TracebackException), and even chain, a copy
169 // of the exception in the calling interpreter.
170
171 typedef struct _sharedexception {
172 char *name;
173 char *msg;
174 } _sharedexception;
175
176 static _sharedexception *
_sharedexception_new(void)177 _sharedexception_new(void)
178 {
179 _sharedexception *err = PyMem_NEW(_sharedexception, 1);
180 if (err == NULL) {
181 PyErr_NoMemory();
182 return NULL;
183 }
184 err->name = NULL;
185 err->msg = NULL;
186 return err;
187 }
188
189 static void
_sharedexception_clear(_sharedexception * exc)190 _sharedexception_clear(_sharedexception *exc)
191 {
192 if (exc->name != NULL) {
193 PyMem_Free(exc->name);
194 }
195 if (exc->msg != NULL) {
196 PyMem_Free(exc->msg);
197 }
198 }
199
200 static void
_sharedexception_free(_sharedexception * exc)201 _sharedexception_free(_sharedexception *exc)
202 {
203 _sharedexception_clear(exc);
204 PyMem_Free(exc);
205 }
206
207 static _sharedexception *
_sharedexception_bind(PyObject * exctype,PyObject * exc,PyObject * tb)208 _sharedexception_bind(PyObject *exctype, PyObject *exc, PyObject *tb)
209 {
210 assert(exctype != NULL);
211 char *failure = NULL;
212
213 _sharedexception *err = _sharedexception_new();
214 if (err == NULL) {
215 goto finally;
216 }
217
218 PyObject *name = PyUnicode_FromFormat("%S", exctype);
219 if (name == NULL) {
220 failure = "unable to format exception type name";
221 goto finally;
222 }
223 err->name = _copy_raw_string(name);
224 Py_DECREF(name);
225 if (err->name == NULL) {
226 if (PyErr_ExceptionMatches(PyExc_MemoryError)) {
227 failure = "out of memory copying exception type name";
228 } else {
229 failure = "unable to encode and copy exception type name";
230 }
231 goto finally;
232 }
233
234 if (exc != NULL) {
235 PyObject *msg = PyUnicode_FromFormat("%S", exc);
236 if (msg == NULL) {
237 failure = "unable to format exception message";
238 goto finally;
239 }
240 err->msg = _copy_raw_string(msg);
241 Py_DECREF(msg);
242 if (err->msg == NULL) {
243 if (PyErr_ExceptionMatches(PyExc_MemoryError)) {
244 failure = "out of memory copying exception message";
245 } else {
246 failure = "unable to encode and copy exception message";
247 }
248 goto finally;
249 }
250 }
251
252 finally:
253 if (failure != NULL) {
254 PyErr_Clear();
255 if (err->name != NULL) {
256 PyMem_Free(err->name);
257 err->name = NULL;
258 }
259 err->msg = failure;
260 }
261 return err;
262 }
263
264 static void
_sharedexception_apply(_sharedexception * exc,PyObject * wrapperclass)265 _sharedexception_apply(_sharedexception *exc, PyObject *wrapperclass)
266 {
267 if (exc->name != NULL) {
268 if (exc->msg != NULL) {
269 PyErr_Format(wrapperclass, "%s: %s", exc->name, exc->msg);
270 }
271 else {
272 PyErr_SetString(wrapperclass, exc->name);
273 }
274 }
275 else if (exc->msg != NULL) {
276 PyErr_SetString(wrapperclass, exc->msg);
277 }
278 else {
279 PyErr_SetNone(wrapperclass);
280 }
281 }
282
283
284 /* channel-specific code ****************************************************/
285
286 #define CHANNEL_SEND 1
287 #define CHANNEL_BOTH 0
288 #define CHANNEL_RECV -1
289
290 static PyObject *ChannelError;
291 static PyObject *ChannelNotFoundError;
292 static PyObject *ChannelClosedError;
293 static PyObject *ChannelEmptyError;
294 static PyObject *ChannelNotEmptyError;
295
296 static int
channel_exceptions_init(PyObject * ns)297 channel_exceptions_init(PyObject *ns)
298 {
299 // XXX Move the exceptions into per-module memory?
300
301 // A channel-related operation failed.
302 ChannelError = PyErr_NewException("_xxsubinterpreters.ChannelError",
303 PyExc_RuntimeError, NULL);
304 if (ChannelError == NULL) {
305 return -1;
306 }
307 if (PyDict_SetItemString(ns, "ChannelError", ChannelError) != 0) {
308 return -1;
309 }
310
311 // An operation tried to use a channel that doesn't exist.
312 ChannelNotFoundError = PyErr_NewException(
313 "_xxsubinterpreters.ChannelNotFoundError", ChannelError, NULL);
314 if (ChannelNotFoundError == NULL) {
315 return -1;
316 }
317 if (PyDict_SetItemString(ns, "ChannelNotFoundError", ChannelNotFoundError) != 0) {
318 return -1;
319 }
320
321 // An operation tried to use a closed channel.
322 ChannelClosedError = PyErr_NewException(
323 "_xxsubinterpreters.ChannelClosedError", ChannelError, NULL);
324 if (ChannelClosedError == NULL) {
325 return -1;
326 }
327 if (PyDict_SetItemString(ns, "ChannelClosedError", ChannelClosedError) != 0) {
328 return -1;
329 }
330
331 // An operation tried to pop from an empty channel.
332 ChannelEmptyError = PyErr_NewException(
333 "_xxsubinterpreters.ChannelEmptyError", ChannelError, NULL);
334 if (ChannelEmptyError == NULL) {
335 return -1;
336 }
337 if (PyDict_SetItemString(ns, "ChannelEmptyError", ChannelEmptyError) != 0) {
338 return -1;
339 }
340
341 // An operation tried to close a non-empty channel.
342 ChannelNotEmptyError = PyErr_NewException(
343 "_xxsubinterpreters.ChannelNotEmptyError", ChannelError, NULL);
344 if (ChannelNotEmptyError == NULL) {
345 return -1;
346 }
347 if (PyDict_SetItemString(ns, "ChannelNotEmptyError", ChannelNotEmptyError) != 0) {
348 return -1;
349 }
350
351 return 0;
352 }
353
354 /* the channel queue */
355
356 struct _channelitem;
357
358 typedef struct _channelitem {
359 _PyCrossInterpreterData *data;
360 struct _channelitem *next;
361 } _channelitem;
362
363 static _channelitem *
_channelitem_new(void)364 _channelitem_new(void)
365 {
366 _channelitem *item = PyMem_NEW(_channelitem, 1);
367 if (item == NULL) {
368 PyErr_NoMemory();
369 return NULL;
370 }
371 item->data = NULL;
372 item->next = NULL;
373 return item;
374 }
375
376 static void
_channelitem_clear(_channelitem * item)377 _channelitem_clear(_channelitem *item)
378 {
379 if (item->data != NULL) {
380 _PyCrossInterpreterData_Release(item->data);
381 PyMem_Free(item->data);
382 item->data = NULL;
383 }
384 item->next = NULL;
385 }
386
387 static void
_channelitem_free(_channelitem * item)388 _channelitem_free(_channelitem *item)
389 {
390 _channelitem_clear(item);
391 PyMem_Free(item);
392 }
393
394 static void
_channelitem_free_all(_channelitem * item)395 _channelitem_free_all(_channelitem *item)
396 {
397 while (item != NULL) {
398 _channelitem *last = item;
399 item = item->next;
400 _channelitem_free(last);
401 }
402 }
403
404 static _PyCrossInterpreterData *
_channelitem_popped(_channelitem * item)405 _channelitem_popped(_channelitem *item)
406 {
407 _PyCrossInterpreterData *data = item->data;
408 item->data = NULL;
409 _channelitem_free(item);
410 return data;
411 }
412
413 typedef struct _channelqueue {
414 int64_t count;
415 _channelitem *first;
416 _channelitem *last;
417 } _channelqueue;
418
419 static _channelqueue *
_channelqueue_new(void)420 _channelqueue_new(void)
421 {
422 _channelqueue *queue = PyMem_NEW(_channelqueue, 1);
423 if (queue == NULL) {
424 PyErr_NoMemory();
425 return NULL;
426 }
427 queue->count = 0;
428 queue->first = NULL;
429 queue->last = NULL;
430 return queue;
431 }
432
433 static void
_channelqueue_clear(_channelqueue * queue)434 _channelqueue_clear(_channelqueue *queue)
435 {
436 _channelitem_free_all(queue->first);
437 queue->count = 0;
438 queue->first = NULL;
439 queue->last = NULL;
440 }
441
442 static void
_channelqueue_free(_channelqueue * queue)443 _channelqueue_free(_channelqueue *queue)
444 {
445 _channelqueue_clear(queue);
446 PyMem_Free(queue);
447 }
448
449 static int
_channelqueue_put(_channelqueue * queue,_PyCrossInterpreterData * data)450 _channelqueue_put(_channelqueue *queue, _PyCrossInterpreterData *data)
451 {
452 _channelitem *item = _channelitem_new();
453 if (item == NULL) {
454 return -1;
455 }
456 item->data = data;
457
458 queue->count += 1;
459 if (queue->first == NULL) {
460 queue->first = item;
461 }
462 else {
463 queue->last->next = item;
464 }
465 queue->last = item;
466 return 0;
467 }
468
469 static _PyCrossInterpreterData *
_channelqueue_get(_channelqueue * queue)470 _channelqueue_get(_channelqueue *queue)
471 {
472 _channelitem *item = queue->first;
473 if (item == NULL) {
474 return NULL;
475 }
476 queue->first = item->next;
477 if (queue->last == item) {
478 queue->last = NULL;
479 }
480 queue->count -= 1;
481
482 return _channelitem_popped(item);
483 }
484
485 /* channel-interpreter associations */
486
487 struct _channelend;
488
489 typedef struct _channelend {
490 struct _channelend *next;
491 int64_t interp;
492 int open;
493 } _channelend;
494
495 static _channelend *
_channelend_new(int64_t interp)496 _channelend_new(int64_t interp)
497 {
498 _channelend *end = PyMem_NEW(_channelend, 1);
499 if (end == NULL) {
500 PyErr_NoMemory();
501 return NULL;
502 }
503 end->next = NULL;
504 end->interp = interp;
505 end->open = 1;
506 return end;
507 }
508
509 static void
_channelend_free(_channelend * end)510 _channelend_free(_channelend *end)
511 {
512 PyMem_Free(end);
513 }
514
515 static void
_channelend_free_all(_channelend * end)516 _channelend_free_all(_channelend *end)
517 {
518 while (end != NULL) {
519 _channelend *last = end;
520 end = end->next;
521 _channelend_free(last);
522 }
523 }
524
525 static _channelend *
_channelend_find(_channelend * first,int64_t interp,_channelend ** pprev)526 _channelend_find(_channelend *first, int64_t interp, _channelend **pprev)
527 {
528 _channelend *prev = NULL;
529 _channelend *end = first;
530 while (end != NULL) {
531 if (end->interp == interp) {
532 break;
533 }
534 prev = end;
535 end = end->next;
536 }
537 if (pprev != NULL) {
538 *pprev = prev;
539 }
540 return end;
541 }
542
543 typedef struct _channelassociations {
544 // Note that the list entries are never removed for interpreter
545 // for which the channel is closed. This should not be a problem in
546 // practice. Also, a channel isn't automatically closed when an
547 // interpreter is destroyed.
548 int64_t numsendopen;
549 int64_t numrecvopen;
550 _channelend *send;
551 _channelend *recv;
552 } _channelends;
553
554 static _channelends *
_channelends_new(void)555 _channelends_new(void)
556 {
557 _channelends *ends = PyMem_NEW(_channelends, 1);
558 if (ends== NULL) {
559 return NULL;
560 }
561 ends->numsendopen = 0;
562 ends->numrecvopen = 0;
563 ends->send = NULL;
564 ends->recv = NULL;
565 return ends;
566 }
567
568 static void
_channelends_clear(_channelends * ends)569 _channelends_clear(_channelends *ends)
570 {
571 _channelend_free_all(ends->send);
572 ends->send = NULL;
573 ends->numsendopen = 0;
574
575 _channelend_free_all(ends->recv);
576 ends->recv = NULL;
577 ends->numrecvopen = 0;
578 }
579
580 static void
_channelends_free(_channelends * ends)581 _channelends_free(_channelends *ends)
582 {
583 _channelends_clear(ends);
584 PyMem_Free(ends);
585 }
586
587 static _channelend *
_channelends_add(_channelends * ends,_channelend * prev,int64_t interp,int send)588 _channelends_add(_channelends *ends, _channelend *prev, int64_t interp,
589 int send)
590 {
591 _channelend *end = _channelend_new(interp);
592 if (end == NULL) {
593 return NULL;
594 }
595
596 if (prev == NULL) {
597 if (send) {
598 ends->send = end;
599 }
600 else {
601 ends->recv = end;
602 }
603 }
604 else {
605 prev->next = end;
606 }
607 if (send) {
608 ends->numsendopen += 1;
609 }
610 else {
611 ends->numrecvopen += 1;
612 }
613 return end;
614 }
615
616 static int
_channelends_associate(_channelends * ends,int64_t interp,int send)617 _channelends_associate(_channelends *ends, int64_t interp, int send)
618 {
619 _channelend *prev;
620 _channelend *end = _channelend_find(send ? ends->send : ends->recv,
621 interp, &prev);
622 if (end != NULL) {
623 if (!end->open) {
624 PyErr_SetString(ChannelClosedError, "channel already closed");
625 return -1;
626 }
627 // already associated
628 return 0;
629 }
630 if (_channelends_add(ends, prev, interp, send) == NULL) {
631 return -1;
632 }
633 return 0;
634 }
635
636 static int
_channelends_is_open(_channelends * ends)637 _channelends_is_open(_channelends *ends)
638 {
639 if (ends->numsendopen != 0 || ends->numrecvopen != 0) {
640 return 1;
641 }
642 if (ends->send == NULL && ends->recv == NULL) {
643 return 1;
644 }
645 return 0;
646 }
647
648 static void
_channelends_close_end(_channelends * ends,_channelend * end,int send)649 _channelends_close_end(_channelends *ends, _channelend *end, int send)
650 {
651 end->open = 0;
652 if (send) {
653 ends->numsendopen -= 1;
654 }
655 else {
656 ends->numrecvopen -= 1;
657 }
658 }
659
660 static int
_channelends_close_interpreter(_channelends * ends,int64_t interp,int which)661 _channelends_close_interpreter(_channelends *ends, int64_t interp, int which)
662 {
663 _channelend *prev;
664 _channelend *end;
665 if (which >= 0) { // send/both
666 end = _channelend_find(ends->send, interp, &prev);
667 if (end == NULL) {
668 // never associated so add it
669 end = _channelends_add(ends, prev, interp, 1);
670 if (end == NULL) {
671 return -1;
672 }
673 }
674 _channelends_close_end(ends, end, 1);
675 }
676 if (which <= 0) { // recv/both
677 end = _channelend_find(ends->recv, interp, &prev);
678 if (end == NULL) {
679 // never associated so add it
680 end = _channelends_add(ends, prev, interp, 0);
681 if (end == NULL) {
682 return -1;
683 }
684 }
685 _channelends_close_end(ends, end, 0);
686 }
687 return 0;
688 }
689
690 static void
_channelends_close_all(_channelends * ends,int which,int force)691 _channelends_close_all(_channelends *ends, int which, int force)
692 {
693 // XXX Handle the ends.
694 // XXX Handle force is True.
695
696 // Ensure all the "send"-associated interpreters are closed.
697 _channelend *end;
698 for (end = ends->send; end != NULL; end = end->next) {
699 _channelends_close_end(ends, end, 1);
700 }
701
702 // Ensure all the "recv"-associated interpreters are closed.
703 for (end = ends->recv; end != NULL; end = end->next) {
704 _channelends_close_end(ends, end, 0);
705 }
706 }
707
708 /* channels */
709
710 struct _channel;
711 struct _channel_closing;
712 static void _channel_clear_closing(struct _channel *);
713 static void _channel_finish_closing(struct _channel *);
714
715 typedef struct _channel {
716 PyThread_type_lock mutex;
717 _channelqueue *queue;
718 _channelends *ends;
719 int open;
720 struct _channel_closing *closing;
721 } _PyChannelState;
722
723 static _PyChannelState *
_channel_new(void)724 _channel_new(void)
725 {
726 _PyChannelState *chan = PyMem_NEW(_PyChannelState, 1);
727 if (chan == NULL) {
728 return NULL;
729 }
730 chan->mutex = PyThread_allocate_lock();
731 if (chan->mutex == NULL) {
732 PyMem_Free(chan);
733 PyErr_SetString(ChannelError,
734 "can't initialize mutex for new channel");
735 return NULL;
736 }
737 chan->queue = _channelqueue_new();
738 if (chan->queue == NULL) {
739 PyMem_Free(chan);
740 return NULL;
741 }
742 chan->ends = _channelends_new();
743 if (chan->ends == NULL) {
744 _channelqueue_free(chan->queue);
745 PyMem_Free(chan);
746 return NULL;
747 }
748 chan->open = 1;
749 chan->closing = NULL;
750 return chan;
751 }
752
753 static void
_channel_free(_PyChannelState * chan)754 _channel_free(_PyChannelState *chan)
755 {
756 _channel_clear_closing(chan);
757 PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
758 _channelqueue_free(chan->queue);
759 _channelends_free(chan->ends);
760 PyThread_release_lock(chan->mutex);
761
762 PyThread_free_lock(chan->mutex);
763 PyMem_Free(chan);
764 }
765
766 static int
_channel_add(_PyChannelState * chan,int64_t interp,_PyCrossInterpreterData * data)767 _channel_add(_PyChannelState *chan, int64_t interp,
768 _PyCrossInterpreterData *data)
769 {
770 int res = -1;
771 PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
772
773 if (!chan->open) {
774 PyErr_SetString(ChannelClosedError, "channel closed");
775 goto done;
776 }
777 if (_channelends_associate(chan->ends, interp, 1) != 0) {
778 goto done;
779 }
780
781 if (_channelqueue_put(chan->queue, data) != 0) {
782 goto done;
783 }
784
785 res = 0;
786 done:
787 PyThread_release_lock(chan->mutex);
788 return res;
789 }
790
791 static _PyCrossInterpreterData *
_channel_next(_PyChannelState * chan,int64_t interp)792 _channel_next(_PyChannelState *chan, int64_t interp)
793 {
794 _PyCrossInterpreterData *data = NULL;
795 PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
796
797 if (!chan->open) {
798 PyErr_SetString(ChannelClosedError, "channel closed");
799 goto done;
800 }
801 if (_channelends_associate(chan->ends, interp, 0) != 0) {
802 goto done;
803 }
804
805 data = _channelqueue_get(chan->queue);
806 if (data == NULL && !PyErr_Occurred() && chan->closing != NULL) {
807 chan->open = 0;
808 }
809
810 done:
811 PyThread_release_lock(chan->mutex);
812 if (chan->queue->count == 0) {
813 _channel_finish_closing(chan);
814 }
815 return data;
816 }
817
818 static int
_channel_close_interpreter(_PyChannelState * chan,int64_t interp,int end)819 _channel_close_interpreter(_PyChannelState *chan, int64_t interp, int end)
820 {
821 PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
822
823 int res = -1;
824 if (!chan->open) {
825 PyErr_SetString(ChannelClosedError, "channel already closed");
826 goto done;
827 }
828
829 if (_channelends_close_interpreter(chan->ends, interp, end) != 0) {
830 goto done;
831 }
832 chan->open = _channelends_is_open(chan->ends);
833
834 res = 0;
835 done:
836 PyThread_release_lock(chan->mutex);
837 return res;
838 }
839
840 static int
_channel_close_all(_PyChannelState * chan,int end,int force)841 _channel_close_all(_PyChannelState *chan, int end, int force)
842 {
843 int res = -1;
844 PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
845
846 if (!chan->open) {
847 PyErr_SetString(ChannelClosedError, "channel already closed");
848 goto done;
849 }
850
851 if (!force && chan->queue->count > 0) {
852 PyErr_SetString(ChannelNotEmptyError,
853 "may not be closed if not empty (try force=True)");
854 goto done;
855 }
856
857 chan->open = 0;
858
859 // We *could* also just leave these in place, since we've marked
860 // the channel as closed already.
861 _channelends_close_all(chan->ends, end, force);
862
863 res = 0;
864 done:
865 PyThread_release_lock(chan->mutex);
866 return res;
867 }
868
869 /* the set of channels */
870
871 struct _channelref;
872
873 typedef struct _channelref {
874 int64_t id;
875 _PyChannelState *chan;
876 struct _channelref *next;
877 Py_ssize_t objcount;
878 } _channelref;
879
880 static _channelref *
_channelref_new(int64_t id,_PyChannelState * chan)881 _channelref_new(int64_t id, _PyChannelState *chan)
882 {
883 _channelref *ref = PyMem_NEW(_channelref, 1);
884 if (ref == NULL) {
885 return NULL;
886 }
887 ref->id = id;
888 ref->chan = chan;
889 ref->next = NULL;
890 ref->objcount = 0;
891 return ref;
892 }
893
894 //static void
895 //_channelref_clear(_channelref *ref)
896 //{
897 // ref->id = -1;
898 // ref->chan = NULL;
899 // ref->next = NULL;
900 // ref->objcount = 0;
901 //}
902
903 static void
_channelref_free(_channelref * ref)904 _channelref_free(_channelref *ref)
905 {
906 if (ref->chan != NULL) {
907 _channel_clear_closing(ref->chan);
908 }
909 //_channelref_clear(ref);
910 PyMem_Free(ref);
911 }
912
913 static _channelref *
_channelref_find(_channelref * first,int64_t id,_channelref ** pprev)914 _channelref_find(_channelref *first, int64_t id, _channelref **pprev)
915 {
916 _channelref *prev = NULL;
917 _channelref *ref = first;
918 while (ref != NULL) {
919 if (ref->id == id) {
920 break;
921 }
922 prev = ref;
923 ref = ref->next;
924 }
925 if (pprev != NULL) {
926 *pprev = prev;
927 }
928 return ref;
929 }
930
931 typedef struct _channels {
932 PyThread_type_lock mutex;
933 _channelref *head;
934 int64_t numopen;
935 int64_t next_id;
936 } _channels;
937
938 static int
_channels_init(_channels * channels)939 _channels_init(_channels *channels)
940 {
941 if (channels->mutex == NULL) {
942 channels->mutex = PyThread_allocate_lock();
943 if (channels->mutex == NULL) {
944 PyErr_SetString(ChannelError,
945 "can't initialize mutex for channel management");
946 return -1;
947 }
948 }
949 channels->head = NULL;
950 channels->numopen = 0;
951 channels->next_id = 0;
952 return 0;
953 }
954
955 static int64_t
_channels_next_id(_channels * channels)956 _channels_next_id(_channels *channels) // needs lock
957 {
958 int64_t id = channels->next_id;
959 if (id < 0) {
960 /* overflow */
961 PyErr_SetString(ChannelError,
962 "failed to get a channel ID");
963 return -1;
964 }
965 channels->next_id += 1;
966 return id;
967 }
968
969 static _PyChannelState *
_channels_lookup(_channels * channels,int64_t id,PyThread_type_lock * pmutex)970 _channels_lookup(_channels *channels, int64_t id, PyThread_type_lock *pmutex)
971 {
972 _PyChannelState *chan = NULL;
973 PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
974 if (pmutex != NULL) {
975 *pmutex = NULL;
976 }
977
978 _channelref *ref = _channelref_find(channels->head, id, NULL);
979 if (ref == NULL) {
980 PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", id);
981 goto done;
982 }
983 if (ref->chan == NULL || !ref->chan->open) {
984 PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", id);
985 goto done;
986 }
987
988 if (pmutex != NULL) {
989 // The mutex will be closed by the caller.
990 *pmutex = channels->mutex;
991 }
992
993 chan = ref->chan;
994 done:
995 if (pmutex == NULL || *pmutex == NULL) {
996 PyThread_release_lock(channels->mutex);
997 }
998 return chan;
999 }
1000
1001 static int64_t
_channels_add(_channels * channels,_PyChannelState * chan)1002 _channels_add(_channels *channels, _PyChannelState *chan)
1003 {
1004 int64_t cid = -1;
1005 PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1006
1007 // Create a new ref.
1008 int64_t id = _channels_next_id(channels);
1009 if (id < 0) {
1010 goto done;
1011 }
1012 _channelref *ref = _channelref_new(id, chan);
1013 if (ref == NULL) {
1014 goto done;
1015 }
1016
1017 // Add it to the list.
1018 // We assume that the channel is a new one (not already in the list).
1019 ref->next = channels->head;
1020 channels->head = ref;
1021 channels->numopen += 1;
1022
1023 cid = id;
1024 done:
1025 PyThread_release_lock(channels->mutex);
1026 return cid;
1027 }
1028
1029 /* forward */
1030 static int _channel_set_closing(struct _channelref *, PyThread_type_lock);
1031
1032 static int
_channels_close(_channels * channels,int64_t cid,_PyChannelState ** pchan,int end,int force)1033 _channels_close(_channels *channels, int64_t cid, _PyChannelState **pchan,
1034 int end, int force)
1035 {
1036 int res = -1;
1037 PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1038 if (pchan != NULL) {
1039 *pchan = NULL;
1040 }
1041
1042 _channelref *ref = _channelref_find(channels->head, cid, NULL);
1043 if (ref == NULL) {
1044 PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", cid);
1045 goto done;
1046 }
1047
1048 if (ref->chan == NULL) {
1049 PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", cid);
1050 goto done;
1051 }
1052 else if (!force && end == CHANNEL_SEND && ref->chan->closing != NULL) {
1053 PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", cid);
1054 goto done;
1055 }
1056 else {
1057 if (_channel_close_all(ref->chan, end, force) != 0) {
1058 if (end == CHANNEL_SEND &&
1059 PyErr_ExceptionMatches(ChannelNotEmptyError)) {
1060 if (ref->chan->closing != NULL) {
1061 PyErr_Format(ChannelClosedError,
1062 "channel %" PRId64 " closed", cid);
1063 goto done;
1064 }
1065 // Mark the channel as closing and return. The channel
1066 // will be cleaned up in _channel_next().
1067 PyErr_Clear();
1068 if (_channel_set_closing(ref, channels->mutex) != 0) {
1069 goto done;
1070 }
1071 if (pchan != NULL) {
1072 *pchan = ref->chan;
1073 }
1074 res = 0;
1075 }
1076 goto done;
1077 }
1078 if (pchan != NULL) {
1079 *pchan = ref->chan;
1080 }
1081 else {
1082 _channel_free(ref->chan);
1083 }
1084 ref->chan = NULL;
1085 }
1086
1087 res = 0;
1088 done:
1089 PyThread_release_lock(channels->mutex);
1090 return res;
1091 }
1092
1093 static void
_channels_remove_ref(_channels * channels,_channelref * ref,_channelref * prev,_PyChannelState ** pchan)1094 _channels_remove_ref(_channels *channels, _channelref *ref, _channelref *prev,
1095 _PyChannelState **pchan)
1096 {
1097 if (ref == channels->head) {
1098 channels->head = ref->next;
1099 }
1100 else {
1101 prev->next = ref->next;
1102 }
1103 channels->numopen -= 1;
1104
1105 if (pchan != NULL) {
1106 *pchan = ref->chan;
1107 }
1108 _channelref_free(ref);
1109 }
1110
1111 static int
_channels_remove(_channels * channels,int64_t id,_PyChannelState ** pchan)1112 _channels_remove(_channels *channels, int64_t id, _PyChannelState **pchan)
1113 {
1114 int res = -1;
1115 PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1116
1117 if (pchan != NULL) {
1118 *pchan = NULL;
1119 }
1120
1121 _channelref *prev = NULL;
1122 _channelref *ref = _channelref_find(channels->head, id, &prev);
1123 if (ref == NULL) {
1124 PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", id);
1125 goto done;
1126 }
1127
1128 _channels_remove_ref(channels, ref, prev, pchan);
1129
1130 res = 0;
1131 done:
1132 PyThread_release_lock(channels->mutex);
1133 return res;
1134 }
1135
1136 static int
_channels_add_id_object(_channels * channels,int64_t id)1137 _channels_add_id_object(_channels *channels, int64_t id)
1138 {
1139 int res = -1;
1140 PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1141
1142 _channelref *ref = _channelref_find(channels->head, id, NULL);
1143 if (ref == NULL) {
1144 PyErr_Format(ChannelNotFoundError, "channel %" PRId64 " not found", id);
1145 goto done;
1146 }
1147 ref->objcount += 1;
1148
1149 res = 0;
1150 done:
1151 PyThread_release_lock(channels->mutex);
1152 return res;
1153 }
1154
1155 static void
_channels_drop_id_object(_channels * channels,int64_t id)1156 _channels_drop_id_object(_channels *channels, int64_t id)
1157 {
1158 PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1159
1160 _channelref *prev = NULL;
1161 _channelref *ref = _channelref_find(channels->head, id, &prev);
1162 if (ref == NULL) {
1163 // Already destroyed.
1164 goto done;
1165 }
1166 ref->objcount -= 1;
1167
1168 // Destroy if no longer used.
1169 if (ref->objcount == 0) {
1170 _PyChannelState *chan = NULL;
1171 _channels_remove_ref(channels, ref, prev, &chan);
1172 if (chan != NULL) {
1173 _channel_free(chan);
1174 }
1175 }
1176
1177 done:
1178 PyThread_release_lock(channels->mutex);
1179 }
1180
1181 static int64_t *
_channels_list_all(_channels * channels,int64_t * count)1182 _channels_list_all(_channels *channels, int64_t *count)
1183 {
1184 int64_t *cids = NULL;
1185 PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
1186 int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(channels->numopen));
1187 if (ids == NULL) {
1188 goto done;
1189 }
1190 _channelref *ref = channels->head;
1191 for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
1192 ids[i] = ref->id;
1193 }
1194 *count = channels->numopen;
1195
1196 cids = ids;
1197 done:
1198 PyThread_release_lock(channels->mutex);
1199 return cids;
1200 }
1201
1202 /* support for closing non-empty channels */
1203
1204 struct _channel_closing {
1205 struct _channelref *ref;
1206 };
1207
1208 static int
_channel_set_closing(struct _channelref * ref,PyThread_type_lock mutex)1209 _channel_set_closing(struct _channelref *ref, PyThread_type_lock mutex) {
1210 struct _channel *chan = ref->chan;
1211 if (chan == NULL) {
1212 // already closed
1213 return 0;
1214 }
1215 int res = -1;
1216 PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
1217 if (chan->closing != NULL) {
1218 PyErr_SetString(ChannelClosedError, "channel closed");
1219 goto done;
1220 }
1221 chan->closing = PyMem_NEW(struct _channel_closing, 1);
1222 if (chan->closing == NULL) {
1223 goto done;
1224 }
1225 chan->closing->ref = ref;
1226
1227 res = 0;
1228 done:
1229 PyThread_release_lock(chan->mutex);
1230 return res;
1231 }
1232
1233 static void
_channel_clear_closing(struct _channel * chan)1234 _channel_clear_closing(struct _channel *chan) {
1235 PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
1236 if (chan->closing != NULL) {
1237 PyMem_Free(chan->closing);
1238 chan->closing = NULL;
1239 }
1240 PyThread_release_lock(chan->mutex);
1241 }
1242
1243 static void
_channel_finish_closing(struct _channel * chan)1244 _channel_finish_closing(struct _channel *chan) {
1245 struct _channel_closing *closing = chan->closing;
1246 if (closing == NULL) {
1247 return;
1248 }
1249 _channelref *ref = closing->ref;
1250 _channel_clear_closing(chan);
1251 // Do the things that would have been done in _channels_close().
1252 ref->chan = NULL;
1253 _channel_free(chan);
1254 }
1255
1256 /* "high"-level channel-related functions */
1257
1258 static int64_t
_channel_create(_channels * channels)1259 _channel_create(_channels *channels)
1260 {
1261 _PyChannelState *chan = _channel_new();
1262 if (chan == NULL) {
1263 return -1;
1264 }
1265 int64_t id = _channels_add(channels, chan);
1266 if (id < 0) {
1267 _channel_free(chan);
1268 return -1;
1269 }
1270 return id;
1271 }
1272
1273 static int
_channel_destroy(_channels * channels,int64_t id)1274 _channel_destroy(_channels *channels, int64_t id)
1275 {
1276 _PyChannelState *chan = NULL;
1277 if (_channels_remove(channels, id, &chan) != 0) {
1278 return -1;
1279 }
1280 if (chan != NULL) {
1281 _channel_free(chan);
1282 }
1283 return 0;
1284 }
1285
1286 static int
_channel_send(_channels * channels,int64_t id,PyObject * obj)1287 _channel_send(_channels *channels, int64_t id, PyObject *obj)
1288 {
1289 PyInterpreterState *interp = _get_current();
1290 if (interp == NULL) {
1291 return -1;
1292 }
1293
1294 // Look up the channel.
1295 PyThread_type_lock mutex = NULL;
1296 _PyChannelState *chan = _channels_lookup(channels, id, &mutex);
1297 if (chan == NULL) {
1298 return -1;
1299 }
1300 // Past this point we are responsible for releasing the mutex.
1301
1302 if (chan->closing != NULL) {
1303 PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", id);
1304 PyThread_release_lock(mutex);
1305 return -1;
1306 }
1307
1308 // Convert the object to cross-interpreter data.
1309 _PyCrossInterpreterData *data = PyMem_NEW(_PyCrossInterpreterData, 1);
1310 if (data == NULL) {
1311 PyThread_release_lock(mutex);
1312 return -1;
1313 }
1314 if (_PyObject_GetCrossInterpreterData(obj, data) != 0) {
1315 PyThread_release_lock(mutex);
1316 PyMem_Free(data);
1317 return -1;
1318 }
1319
1320 // Add the data to the channel.
1321 int res = _channel_add(chan, PyInterpreterState_GetID(interp), data);
1322 PyThread_release_lock(mutex);
1323 if (res != 0) {
1324 _PyCrossInterpreterData_Release(data);
1325 PyMem_Free(data);
1326 return -1;
1327 }
1328
1329 return 0;
1330 }
1331
1332 static PyObject *
_channel_recv(_channels * channels,int64_t id)1333 _channel_recv(_channels *channels, int64_t id)
1334 {
1335 PyInterpreterState *interp = _get_current();
1336 if (interp == NULL) {
1337 return NULL;
1338 }
1339
1340 // Look up the channel.
1341 PyThread_type_lock mutex = NULL;
1342 _PyChannelState *chan = _channels_lookup(channels, id, &mutex);
1343 if (chan == NULL) {
1344 return NULL;
1345 }
1346 // Past this point we are responsible for releasing the mutex.
1347
1348 // Pop off the next item from the channel.
1349 _PyCrossInterpreterData *data = _channel_next(chan, PyInterpreterState_GetID(interp));
1350 PyThread_release_lock(mutex);
1351 if (data == NULL) {
1352 return NULL;
1353 }
1354
1355 // Convert the data back to an object.
1356 PyObject *obj = _PyCrossInterpreterData_NewObject(data);
1357 _PyCrossInterpreterData_Release(data);
1358 PyMem_Free(data);
1359 if (obj == NULL) {
1360 return NULL;
1361 }
1362
1363 return obj;
1364 }
1365
1366 static int
_channel_drop(_channels * channels,int64_t id,int send,int recv)1367 _channel_drop(_channels *channels, int64_t id, int send, int recv)
1368 {
1369 PyInterpreterState *interp = _get_current();
1370 if (interp == NULL) {
1371 return -1;
1372 }
1373
1374 // Look up the channel.
1375 PyThread_type_lock mutex = NULL;
1376 _PyChannelState *chan = _channels_lookup(channels, id, &mutex);
1377 if (chan == NULL) {
1378 return -1;
1379 }
1380 // Past this point we are responsible for releasing the mutex.
1381
1382 // Close one or both of the two ends.
1383 int res = _channel_close_interpreter(chan, PyInterpreterState_GetID(interp), send-recv);
1384 PyThread_release_lock(mutex);
1385 return res;
1386 }
1387
1388 static int
_channel_close(_channels * channels,int64_t id,int end,int force)1389 _channel_close(_channels *channels, int64_t id, int end, int force)
1390 {
1391 return _channels_close(channels, id, NULL, end, force);
1392 }
1393
1394 static int
_channel_is_associated(_channels * channels,int64_t cid,int64_t interp,int send)1395 _channel_is_associated(_channels *channels, int64_t cid, int64_t interp,
1396 int send)
1397 {
1398 _PyChannelState *chan = _channels_lookup(channels, cid, NULL);
1399 if (chan == NULL) {
1400 return -1;
1401 } else if (send && chan->closing != NULL) {
1402 PyErr_Format(ChannelClosedError, "channel %" PRId64 " closed", cid);
1403 return -1;
1404 }
1405
1406 _channelend *end = _channelend_find(send ? chan->ends->send : chan->ends->recv,
1407 interp, NULL);
1408
1409 return (end != NULL && end->open);
1410 }
1411
1412 /* ChannelID class */
1413
1414 static PyTypeObject ChannelIDtype;
1415
1416 typedef struct channelid {
1417 PyObject_HEAD
1418 int64_t id;
1419 int end;
1420 int resolve;
1421 _channels *channels;
1422 } channelid;
1423
1424 static int
channel_id_converter(PyObject * arg,void * ptr)1425 channel_id_converter(PyObject *arg, void *ptr)
1426 {
1427 int64_t cid;
1428 if (PyObject_TypeCheck(arg, &ChannelIDtype)) {
1429 cid = ((channelid *)arg)->id;
1430 }
1431 else if (PyIndex_Check(arg)) {
1432 cid = PyLong_AsLongLong(arg);
1433 if (cid == -1 && PyErr_Occurred()) {
1434 return 0;
1435 }
1436 if (cid < 0) {
1437 PyErr_Format(PyExc_ValueError,
1438 "channel ID must be a non-negative int, got %R", arg);
1439 return 0;
1440 }
1441 }
1442 else {
1443 PyErr_Format(PyExc_TypeError,
1444 "channel ID must be an int, got %.100s",
1445 Py_TYPE(arg)->tp_name);
1446 return 0;
1447 }
1448 *(int64_t *)ptr = cid;
1449 return 1;
1450 }
1451
1452 static channelid *
newchannelid(PyTypeObject * cls,int64_t cid,int end,_channels * channels,int force,int resolve)1453 newchannelid(PyTypeObject *cls, int64_t cid, int end, _channels *channels,
1454 int force, int resolve)
1455 {
1456 channelid *self = PyObject_New(channelid, cls);
1457 if (self == NULL) {
1458 return NULL;
1459 }
1460 self->id = cid;
1461 self->end = end;
1462 self->resolve = resolve;
1463 self->channels = channels;
1464
1465 if (_channels_add_id_object(channels, cid) != 0) {
1466 if (force && PyErr_ExceptionMatches(ChannelNotFoundError)) {
1467 PyErr_Clear();
1468 }
1469 else {
1470 Py_DECREF((PyObject *)self);
1471 return NULL;
1472 }
1473 }
1474
1475 return self;
1476 }
1477
1478 static _channels * _global_channels(void);
1479
1480 static PyObject *
channelid_new(PyTypeObject * cls,PyObject * args,PyObject * kwds)1481 channelid_new(PyTypeObject *cls, PyObject *args, PyObject *kwds)
1482 {
1483 static char *kwlist[] = {"id", "send", "recv", "force", "_resolve", NULL};
1484 int64_t cid;
1485 int send = -1;
1486 int recv = -1;
1487 int force = 0;
1488 int resolve = 0;
1489 if (!PyArg_ParseTupleAndKeywords(args, kwds,
1490 "O&|$pppp:ChannelID.__new__", kwlist,
1491 channel_id_converter, &cid, &send, &recv, &force, &resolve))
1492 return NULL;
1493
1494 // Handle "send" and "recv".
1495 if (send == 0 && recv == 0) {
1496 PyErr_SetString(PyExc_ValueError,
1497 "'send' and 'recv' cannot both be False");
1498 return NULL;
1499 }
1500
1501 int end = 0;
1502 if (send == 1) {
1503 if (recv == 0 || recv == -1) {
1504 end = CHANNEL_SEND;
1505 }
1506 }
1507 else if (recv == 1) {
1508 end = CHANNEL_RECV;
1509 }
1510
1511 return (PyObject *)newchannelid(cls, cid, end, _global_channels(),
1512 force, resolve);
1513 }
1514
1515 static void
channelid_dealloc(PyObject * v)1516 channelid_dealloc(PyObject *v)
1517 {
1518 int64_t cid = ((channelid *)v)->id;
1519 _channels *channels = ((channelid *)v)->channels;
1520 Py_TYPE(v)->tp_free(v);
1521
1522 _channels_drop_id_object(channels, cid);
1523 }
1524
1525 static PyObject *
channelid_repr(PyObject * self)1526 channelid_repr(PyObject *self)
1527 {
1528 PyTypeObject *type = Py_TYPE(self);
1529 const char *name = _PyType_Name(type);
1530
1531 channelid *cid = (channelid *)self;
1532 const char *fmt;
1533 if (cid->end == CHANNEL_SEND) {
1534 fmt = "%s(%" PRId64 ", send=True)";
1535 }
1536 else if (cid->end == CHANNEL_RECV) {
1537 fmt = "%s(%" PRId64 ", recv=True)";
1538 }
1539 else {
1540 fmt = "%s(%" PRId64 ")";
1541 }
1542 return PyUnicode_FromFormat(fmt, name, cid->id);
1543 }
1544
1545 static PyObject *
channelid_str(PyObject * self)1546 channelid_str(PyObject *self)
1547 {
1548 channelid *cid = (channelid *)self;
1549 return PyUnicode_FromFormat("%" PRId64 "", cid->id);
1550 }
1551
1552 static PyObject *
channelid_int(PyObject * self)1553 channelid_int(PyObject *self)
1554 {
1555 channelid *cid = (channelid *)self;
1556 return PyLong_FromLongLong(cid->id);
1557 }
1558
1559 static PyNumberMethods channelid_as_number = {
1560 0, /* nb_add */
1561 0, /* nb_subtract */
1562 0, /* nb_multiply */
1563 0, /* nb_remainder */
1564 0, /* nb_divmod */
1565 0, /* nb_power */
1566 0, /* nb_negative */
1567 0, /* nb_positive */
1568 0, /* nb_absolute */
1569 0, /* nb_bool */
1570 0, /* nb_invert */
1571 0, /* nb_lshift */
1572 0, /* nb_rshift */
1573 0, /* nb_and */
1574 0, /* nb_xor */
1575 0, /* nb_or */
1576 (unaryfunc)channelid_int, /* nb_int */
1577 0, /* nb_reserved */
1578 0, /* nb_float */
1579
1580 0, /* nb_inplace_add */
1581 0, /* nb_inplace_subtract */
1582 0, /* nb_inplace_multiply */
1583 0, /* nb_inplace_remainder */
1584 0, /* nb_inplace_power */
1585 0, /* nb_inplace_lshift */
1586 0, /* nb_inplace_rshift */
1587 0, /* nb_inplace_and */
1588 0, /* nb_inplace_xor */
1589 0, /* nb_inplace_or */
1590
1591 0, /* nb_floor_divide */
1592 0, /* nb_true_divide */
1593 0, /* nb_inplace_floor_divide */
1594 0, /* nb_inplace_true_divide */
1595
1596 (unaryfunc)channelid_int, /* nb_index */
1597 };
1598
1599 static Py_hash_t
channelid_hash(PyObject * self)1600 channelid_hash(PyObject *self)
1601 {
1602 channelid *cid = (channelid *)self;
1603 PyObject *id = PyLong_FromLongLong(cid->id);
1604 if (id == NULL) {
1605 return -1;
1606 }
1607 Py_hash_t hash = PyObject_Hash(id);
1608 Py_DECREF(id);
1609 return hash;
1610 }
1611
1612 static PyObject *
channelid_richcompare(PyObject * self,PyObject * other,int op)1613 channelid_richcompare(PyObject *self, PyObject *other, int op)
1614 {
1615 if (op != Py_EQ && op != Py_NE) {
1616 Py_RETURN_NOTIMPLEMENTED;
1617 }
1618
1619 if (!PyObject_TypeCheck(self, &ChannelIDtype)) {
1620 Py_RETURN_NOTIMPLEMENTED;
1621 }
1622
1623 channelid *cid = (channelid *)self;
1624 int equal;
1625 if (PyObject_TypeCheck(other, &ChannelIDtype)) {
1626 channelid *othercid = (channelid *)other;
1627 equal = (cid->end == othercid->end) && (cid->id == othercid->id);
1628 }
1629 else if (PyLong_Check(other)) {
1630 /* Fast path */
1631 int overflow;
1632 long long othercid = PyLong_AsLongLongAndOverflow(other, &overflow);
1633 if (othercid == -1 && PyErr_Occurred()) {
1634 return NULL;
1635 }
1636 equal = !overflow && (othercid >= 0) && (cid->id == othercid);
1637 }
1638 else if (PyNumber_Check(other)) {
1639 PyObject *pyid = PyLong_FromLongLong(cid->id);
1640 if (pyid == NULL) {
1641 return NULL;
1642 }
1643 PyObject *res = PyObject_RichCompare(pyid, other, op);
1644 Py_DECREF(pyid);
1645 return res;
1646 }
1647 else {
1648 Py_RETURN_NOTIMPLEMENTED;
1649 }
1650
1651 if ((op == Py_EQ && equal) || (op == Py_NE && !equal)) {
1652 Py_RETURN_TRUE;
1653 }
1654 Py_RETURN_FALSE;
1655 }
1656
1657 static PyObject *
_channel_from_cid(PyObject * cid,int end)1658 _channel_from_cid(PyObject *cid, int end)
1659 {
1660 PyObject *highlevel = PyImport_ImportModule("interpreters");
1661 if (highlevel == NULL) {
1662 PyErr_Clear();
1663 highlevel = PyImport_ImportModule("test.support.interpreters");
1664 if (highlevel == NULL) {
1665 return NULL;
1666 }
1667 }
1668 const char *clsname = (end == CHANNEL_RECV) ? "RecvChannel" :
1669 "SendChannel";
1670 PyObject *cls = PyObject_GetAttrString(highlevel, clsname);
1671 Py_DECREF(highlevel);
1672 if (cls == NULL) {
1673 return NULL;
1674 }
1675 PyObject *chan = PyObject_CallFunctionObjArgs(cls, cid, NULL);
1676 Py_DECREF(cls);
1677 if (chan == NULL) {
1678 return NULL;
1679 }
1680 return chan;
1681 }
1682
1683 struct _channelid_xid {
1684 int64_t id;
1685 int end;
1686 int resolve;
1687 };
1688
1689 static PyObject *
_channelid_from_xid(_PyCrossInterpreterData * data)1690 _channelid_from_xid(_PyCrossInterpreterData *data)
1691 {
1692 struct _channelid_xid *xid = (struct _channelid_xid *)data->data;
1693 // Note that we do not preserve the "resolve" flag.
1694 PyObject *cid = (PyObject *)newchannelid(&ChannelIDtype, xid->id, xid->end,
1695 _global_channels(), 0, 0);
1696 if (xid->end == 0) {
1697 return cid;
1698 }
1699 if (!xid->resolve) {
1700 return cid;
1701 }
1702
1703 /* Try returning a high-level channel end but fall back to the ID. */
1704 PyObject *chan = _channel_from_cid(cid, xid->end);
1705 if (chan == NULL) {
1706 PyErr_Clear();
1707 return cid;
1708 }
1709 Py_DECREF(cid);
1710 return chan;
1711 }
1712
1713 static int
_channelid_shared(PyObject * obj,_PyCrossInterpreterData * data)1714 _channelid_shared(PyObject *obj, _PyCrossInterpreterData *data)
1715 {
1716 struct _channelid_xid *xid = PyMem_NEW(struct _channelid_xid, 1);
1717 if (xid == NULL) {
1718 return -1;
1719 }
1720 xid->id = ((channelid *)obj)->id;
1721 xid->end = ((channelid *)obj)->end;
1722 xid->resolve = ((channelid *)obj)->resolve;
1723
1724 data->data = xid;
1725 Py_INCREF(obj);
1726 data->obj = obj;
1727 data->new_object = _channelid_from_xid;
1728 data->free = PyMem_Free;
1729 return 0;
1730 }
1731
1732 static PyObject *
channelid_end(PyObject * self,void * end)1733 channelid_end(PyObject *self, void *end)
1734 {
1735 int force = 1;
1736 channelid *cid = (channelid *)self;
1737 if (end != NULL) {
1738 return (PyObject *)newchannelid(Py_TYPE(self), cid->id, *(int *)end,
1739 cid->channels, force, cid->resolve);
1740 }
1741
1742 if (cid->end == CHANNEL_SEND) {
1743 return PyUnicode_InternFromString("send");
1744 }
1745 if (cid->end == CHANNEL_RECV) {
1746 return PyUnicode_InternFromString("recv");
1747 }
1748 return PyUnicode_InternFromString("both");
1749 }
1750
1751 static int _channelid_end_send = CHANNEL_SEND;
1752 static int _channelid_end_recv = CHANNEL_RECV;
1753
1754 static PyGetSetDef channelid_getsets[] = {
1755 {"end", (getter)channelid_end, NULL,
1756 PyDoc_STR("'send', 'recv', or 'both'")},
1757 {"send", (getter)channelid_end, NULL,
1758 PyDoc_STR("the 'send' end of the channel"), &_channelid_end_send},
1759 {"recv", (getter)channelid_end, NULL,
1760 PyDoc_STR("the 'recv' end of the channel"), &_channelid_end_recv},
1761 {NULL}
1762 };
1763
1764 PyDoc_STRVAR(channelid_doc,
1765 "A channel ID identifies a channel and may be used as an int.");
1766
1767 static PyTypeObject ChannelIDtype = {
1768 PyVarObject_HEAD_INIT(&PyType_Type, 0)
1769 "_xxsubinterpreters.ChannelID", /* tp_name */
1770 sizeof(channelid), /* tp_basicsize */
1771 0, /* tp_itemsize */
1772 (destructor)channelid_dealloc, /* tp_dealloc */
1773 0, /* tp_vectorcall_offset */
1774 0, /* tp_getattr */
1775 0, /* tp_setattr */
1776 0, /* tp_as_async */
1777 (reprfunc)channelid_repr, /* tp_repr */
1778 &channelid_as_number, /* tp_as_number */
1779 0, /* tp_as_sequence */
1780 0, /* tp_as_mapping */
1781 channelid_hash, /* tp_hash */
1782 0, /* tp_call */
1783 (reprfunc)channelid_str, /* tp_str */
1784 0, /* tp_getattro */
1785 0, /* tp_setattro */
1786 0, /* tp_as_buffer */
1787 // Use Py_TPFLAGS_DISALLOW_INSTANTIATION so the type cannot be instantiated
1788 // from Python code. We do this because there is a strong relationship
1789 // between channel IDs and the channel lifecycle, so this limitation avoids
1790 // related complications. Use the _channel_id() function instead.
1791 Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE
1792 | Py_TPFLAGS_DISALLOW_INSTANTIATION, /* tp_flags */
1793 channelid_doc, /* tp_doc */
1794 0, /* tp_traverse */
1795 0, /* tp_clear */
1796 channelid_richcompare, /* tp_richcompare */
1797 0, /* tp_weaklistoffset */
1798 0, /* tp_iter */
1799 0, /* tp_iternext */
1800 0, /* tp_methods */
1801 0, /* tp_members */
1802 channelid_getsets, /* tp_getset */
1803 };
1804
1805
1806 /* interpreter-specific code ************************************************/
1807
1808 static PyObject * RunFailedError = NULL;
1809
1810 static int
interp_exceptions_init(PyObject * ns)1811 interp_exceptions_init(PyObject *ns)
1812 {
1813 // XXX Move the exceptions into per-module memory?
1814
1815 if (RunFailedError == NULL) {
1816 // An uncaught exception came out of interp_run_string().
1817 RunFailedError = PyErr_NewException("_xxsubinterpreters.RunFailedError",
1818 PyExc_RuntimeError, NULL);
1819 if (RunFailedError == NULL) {
1820 return -1;
1821 }
1822 if (PyDict_SetItemString(ns, "RunFailedError", RunFailedError) != 0) {
1823 return -1;
1824 }
1825 }
1826
1827 return 0;
1828 }
1829
1830 static int
_is_running(PyInterpreterState * interp)1831 _is_running(PyInterpreterState *interp)
1832 {
1833 PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
1834 if (PyThreadState_Next(tstate) != NULL) {
1835 PyErr_SetString(PyExc_RuntimeError,
1836 "interpreter has more than one thread");
1837 return -1;
1838 }
1839
1840 assert(!PyErr_Occurred());
1841 _PyInterpreterFrame *frame = tstate->cframe->current_frame;
1842 if (frame == NULL) {
1843 return 0;
1844 }
1845 return 1;
1846 }
1847
1848 static int
_ensure_not_running(PyInterpreterState * interp)1849 _ensure_not_running(PyInterpreterState *interp)
1850 {
1851 int is_running = _is_running(interp);
1852 if (is_running < 0) {
1853 return -1;
1854 }
1855 if (is_running) {
1856 PyErr_Format(PyExc_RuntimeError, "interpreter already running");
1857 return -1;
1858 }
1859 return 0;
1860 }
1861
1862 static int
_run_script(PyInterpreterState * interp,const char * codestr,_sharedns * shared,_sharedexception ** exc)1863 _run_script(PyInterpreterState *interp, const char *codestr,
1864 _sharedns *shared, _sharedexception **exc)
1865 {
1866 PyObject *exctype = NULL;
1867 PyObject *excval = NULL;
1868 PyObject *tb = NULL;
1869
1870 PyObject *main_mod = _PyInterpreterState_GetMainModule(interp);
1871 if (main_mod == NULL) {
1872 goto error;
1873 }
1874 PyObject *ns = PyModule_GetDict(main_mod); // borrowed
1875 Py_DECREF(main_mod);
1876 if (ns == NULL) {
1877 goto error;
1878 }
1879 Py_INCREF(ns);
1880
1881 // Apply the cross-interpreter data.
1882 if (shared != NULL) {
1883 if (_sharedns_apply(shared, ns) != 0) {
1884 Py_DECREF(ns);
1885 goto error;
1886 }
1887 }
1888
1889 // Run the string (see PyRun_SimpleStringFlags).
1890 PyObject *result = PyRun_StringFlags(codestr, Py_file_input, ns, ns, NULL);
1891 Py_DECREF(ns);
1892 if (result == NULL) {
1893 goto error;
1894 }
1895 else {
1896 Py_DECREF(result); // We throw away the result.
1897 }
1898
1899 *exc = NULL;
1900 return 0;
1901
1902 error:
1903 PyErr_Fetch(&exctype, &excval, &tb);
1904
1905 _sharedexception *sharedexc = _sharedexception_bind(exctype, excval, tb);
1906 Py_XDECREF(exctype);
1907 Py_XDECREF(excval);
1908 Py_XDECREF(tb);
1909 if (sharedexc == NULL) {
1910 fprintf(stderr, "RunFailedError: script raised an uncaught exception");
1911 PyErr_Clear();
1912 sharedexc = NULL;
1913 }
1914 else {
1915 assert(!PyErr_Occurred());
1916 }
1917 *exc = sharedexc;
1918 return -1;
1919 }
1920
1921 static int
_run_script_in_interpreter(PyInterpreterState * interp,const char * codestr,PyObject * shareables)1922 _run_script_in_interpreter(PyInterpreterState *interp, const char *codestr,
1923 PyObject *shareables)
1924 {
1925 if (_ensure_not_running(interp) < 0) {
1926 return -1;
1927 }
1928
1929 _sharedns *shared = _get_shared_ns(shareables);
1930 if (shared == NULL && PyErr_Occurred()) {
1931 return -1;
1932 }
1933
1934 // Switch to interpreter.
1935 PyThreadState *save_tstate = NULL;
1936 if (interp != PyInterpreterState_Get()) {
1937 // XXX Using the "head" thread isn't strictly correct.
1938 PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
1939 // XXX Possible GILState issues?
1940 save_tstate = PyThreadState_Swap(tstate);
1941 }
1942
1943 // Run the script.
1944 _sharedexception *exc = NULL;
1945 int result = _run_script(interp, codestr, shared, &exc);
1946
1947 // Switch back.
1948 if (save_tstate != NULL) {
1949 PyThreadState_Swap(save_tstate);
1950 }
1951
1952 // Propagate any exception out to the caller.
1953 if (exc != NULL) {
1954 _sharedexception_apply(exc, RunFailedError);
1955 _sharedexception_free(exc);
1956 }
1957 else if (result != 0) {
1958 // We were unable to allocate a shared exception.
1959 PyErr_NoMemory();
1960 }
1961
1962 if (shared != NULL) {
1963 _sharedns_free(shared);
1964 }
1965
1966 return result;
1967 }
1968
1969
1970 /* module level code ********************************************************/
1971
1972 /* globals is the process-global state for the module. It holds all
1973 the data that we need to share between interpreters, so it cannot
1974 hold PyObject values. */
1975 static struct globals {
1976 _channels channels;
1977 } _globals = {{0}};
1978
1979 static int
_init_globals(void)1980 _init_globals(void)
1981 {
1982 if (_channels_init(&_globals.channels) != 0) {
1983 return -1;
1984 }
1985 return 0;
1986 }
1987
1988 static _channels *
_global_channels(void)1989 _global_channels(void) {
1990 return &_globals.channels;
1991 }
1992
1993 static PyObject *
interp_create(PyObject * self,PyObject * args,PyObject * kwds)1994 interp_create(PyObject *self, PyObject *args, PyObject *kwds)
1995 {
1996
1997 static char *kwlist[] = {"isolated", NULL};
1998 int isolated = 1;
1999 if (!PyArg_ParseTupleAndKeywords(args, kwds, "|$i:create", kwlist,
2000 &isolated)) {
2001 return NULL;
2002 }
2003
2004 // Create and initialize the new interpreter.
2005 PyThreadState *save_tstate = _PyThreadState_GET();
2006 // XXX Possible GILState issues?
2007 PyThreadState *tstate = _Py_NewInterpreter(isolated);
2008 PyThreadState_Swap(save_tstate);
2009 if (tstate == NULL) {
2010 /* Since no new thread state was created, there is no exception to
2011 propagate; raise a fresh one after swapping in the old thread
2012 state. */
2013 PyErr_SetString(PyExc_RuntimeError, "interpreter creation failed");
2014 return NULL;
2015 }
2016 PyInterpreterState *interp = PyThreadState_GetInterpreter(tstate);
2017 PyObject *idobj = _PyInterpreterState_GetIDObject(interp);
2018 if (idobj == NULL) {
2019 // XXX Possible GILState issues?
2020 save_tstate = PyThreadState_Swap(tstate);
2021 Py_EndInterpreter(tstate);
2022 PyThreadState_Swap(save_tstate);
2023 return NULL;
2024 }
2025 _PyInterpreterState_RequireIDRef(interp, 1);
2026 return idobj;
2027 }
2028
2029 PyDoc_STRVAR(create_doc,
2030 "create() -> ID\n\
2031 \n\
2032 Create a new interpreter and return a unique generated ID.");
2033
2034
2035 static PyObject *
interp_destroy(PyObject * self,PyObject * args,PyObject * kwds)2036 interp_destroy(PyObject *self, PyObject *args, PyObject *kwds)
2037 {
2038 static char *kwlist[] = {"id", NULL};
2039 PyObject *id;
2040 // XXX Use "L" for id?
2041 if (!PyArg_ParseTupleAndKeywords(args, kwds,
2042 "O:destroy", kwlist, &id)) {
2043 return NULL;
2044 }
2045
2046 // Look up the interpreter.
2047 PyInterpreterState *interp = _PyInterpreterID_LookUp(id);
2048 if (interp == NULL) {
2049 return NULL;
2050 }
2051
2052 // Ensure we don't try to destroy the current interpreter.
2053 PyInterpreterState *current = _get_current();
2054 if (current == NULL) {
2055 return NULL;
2056 }
2057 if (interp == current) {
2058 PyErr_SetString(PyExc_RuntimeError,
2059 "cannot destroy the current interpreter");
2060 return NULL;
2061 }
2062
2063 // Ensure the interpreter isn't running.
2064 /* XXX We *could* support destroying a running interpreter but
2065 aren't going to worry about it for now. */
2066 if (_ensure_not_running(interp) < 0) {
2067 return NULL;
2068 }
2069
2070 // Destroy the interpreter.
2071 PyThreadState *tstate = PyInterpreterState_ThreadHead(interp);
2072 // XXX Possible GILState issues?
2073 PyThreadState *save_tstate = PyThreadState_Swap(tstate);
2074 Py_EndInterpreter(tstate);
2075 PyThreadState_Swap(save_tstate);
2076
2077 Py_RETURN_NONE;
2078 }
2079
2080 PyDoc_STRVAR(destroy_doc,
2081 "destroy(id)\n\
2082 \n\
2083 Destroy the identified interpreter.\n\
2084 \n\
2085 Attempting to destroy the current interpreter results in a RuntimeError.\n\
2086 So does an unrecognized ID.");
2087
2088
2089 static PyObject *
interp_list_all(PyObject * self,PyObject * Py_UNUSED (ignored))2090 interp_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
2091 {
2092 PyObject *ids, *id;
2093 PyInterpreterState *interp;
2094
2095 ids = PyList_New(0);
2096 if (ids == NULL) {
2097 return NULL;
2098 }
2099
2100 interp = PyInterpreterState_Head();
2101 while (interp != NULL) {
2102 id = _PyInterpreterState_GetIDObject(interp);
2103 if (id == NULL) {
2104 Py_DECREF(ids);
2105 return NULL;
2106 }
2107 // insert at front of list
2108 int res = PyList_Insert(ids, 0, id);
2109 Py_DECREF(id);
2110 if (res < 0) {
2111 Py_DECREF(ids);
2112 return NULL;
2113 }
2114
2115 interp = PyInterpreterState_Next(interp);
2116 }
2117
2118 return ids;
2119 }
2120
2121 PyDoc_STRVAR(list_all_doc,
2122 "list_all() -> [ID]\n\
2123 \n\
2124 Return a list containing the ID of every existing interpreter.");
2125
2126
2127 static PyObject *
interp_get_current(PyObject * self,PyObject * Py_UNUSED (ignored))2128 interp_get_current(PyObject *self, PyObject *Py_UNUSED(ignored))
2129 {
2130 PyInterpreterState *interp =_get_current();
2131 if (interp == NULL) {
2132 return NULL;
2133 }
2134 return _PyInterpreterState_GetIDObject(interp);
2135 }
2136
2137 PyDoc_STRVAR(get_current_doc,
2138 "get_current() -> ID\n\
2139 \n\
2140 Return the ID of current interpreter.");
2141
2142
2143 static PyObject *
interp_get_main(PyObject * self,PyObject * Py_UNUSED (ignored))2144 interp_get_main(PyObject *self, PyObject *Py_UNUSED(ignored))
2145 {
2146 // Currently, 0 is always the main interpreter.
2147 int64_t id = 0;
2148 return _PyInterpreterID_New(id);
2149 }
2150
2151 PyDoc_STRVAR(get_main_doc,
2152 "get_main() -> ID\n\
2153 \n\
2154 Return the ID of main interpreter.");
2155
2156
2157 static PyObject *
interp_run_string(PyObject * self,PyObject * args,PyObject * kwds)2158 interp_run_string(PyObject *self, PyObject *args, PyObject *kwds)
2159 {
2160 static char *kwlist[] = {"id", "script", "shared", NULL};
2161 PyObject *id, *code;
2162 PyObject *shared = NULL;
2163 if (!PyArg_ParseTupleAndKeywords(args, kwds,
2164 "OU|O:run_string", kwlist,
2165 &id, &code, &shared)) {
2166 return NULL;
2167 }
2168
2169 // Look up the interpreter.
2170 PyInterpreterState *interp = _PyInterpreterID_LookUp(id);
2171 if (interp == NULL) {
2172 return NULL;
2173 }
2174
2175 // Extract code.
2176 Py_ssize_t size;
2177 const char *codestr = PyUnicode_AsUTF8AndSize(code, &size);
2178 if (codestr == NULL) {
2179 return NULL;
2180 }
2181 if (strlen(codestr) != (size_t)size) {
2182 PyErr_SetString(PyExc_ValueError,
2183 "source code string cannot contain null bytes");
2184 return NULL;
2185 }
2186
2187 // Run the code in the interpreter.
2188 if (_run_script_in_interpreter(interp, codestr, shared) != 0) {
2189 return NULL;
2190 }
2191 Py_RETURN_NONE;
2192 }
2193
2194 PyDoc_STRVAR(run_string_doc,
2195 "run_string(id, script, shared)\n\
2196 \n\
2197 Execute the provided string in the identified interpreter.\n\
2198 \n\
2199 See PyRun_SimpleStrings.");
2200
2201
2202 static PyObject *
object_is_shareable(PyObject * self,PyObject * args,PyObject * kwds)2203 object_is_shareable(PyObject *self, PyObject *args, PyObject *kwds)
2204 {
2205 static char *kwlist[] = {"obj", NULL};
2206 PyObject *obj;
2207 if (!PyArg_ParseTupleAndKeywords(args, kwds,
2208 "O:is_shareable", kwlist, &obj)) {
2209 return NULL;
2210 }
2211
2212 if (_PyObject_CheckCrossInterpreterData(obj) == 0) {
2213 Py_RETURN_TRUE;
2214 }
2215 PyErr_Clear();
2216 Py_RETURN_FALSE;
2217 }
2218
2219 PyDoc_STRVAR(is_shareable_doc,
2220 "is_shareable(obj) -> bool\n\
2221 \n\
2222 Return True if the object's data may be shared between interpreters and\n\
2223 False otherwise.");
2224
2225
2226 static PyObject *
interp_is_running(PyObject * self,PyObject * args,PyObject * kwds)2227 interp_is_running(PyObject *self, PyObject *args, PyObject *kwds)
2228 {
2229 static char *kwlist[] = {"id", NULL};
2230 PyObject *id;
2231 if (!PyArg_ParseTupleAndKeywords(args, kwds,
2232 "O:is_running", kwlist, &id)) {
2233 return NULL;
2234 }
2235
2236 PyInterpreterState *interp = _PyInterpreterID_LookUp(id);
2237 if (interp == NULL) {
2238 return NULL;
2239 }
2240 int is_running = _is_running(interp);
2241 if (is_running < 0) {
2242 return NULL;
2243 }
2244 if (is_running) {
2245 Py_RETURN_TRUE;
2246 }
2247 Py_RETURN_FALSE;
2248 }
2249
2250 PyDoc_STRVAR(is_running_doc,
2251 "is_running(id) -> bool\n\
2252 \n\
2253 Return whether or not the identified interpreter is running.");
2254
2255 static PyObject *
channel_create(PyObject * self,PyObject * Py_UNUSED (ignored))2256 channel_create(PyObject *self, PyObject *Py_UNUSED(ignored))
2257 {
2258 int64_t cid = _channel_create(&_globals.channels);
2259 if (cid < 0) {
2260 return NULL;
2261 }
2262 PyObject *id = (PyObject *)newchannelid(&ChannelIDtype, cid, 0,
2263 &_globals.channels, 0, 0);
2264 if (id == NULL) {
2265 if (_channel_destroy(&_globals.channels, cid) != 0) {
2266 // XXX issue a warning?
2267 }
2268 return NULL;
2269 }
2270 assert(((channelid *)id)->channels != NULL);
2271 return id;
2272 }
2273
2274 PyDoc_STRVAR(channel_create_doc,
2275 "channel_create() -> cid\n\
2276 \n\
2277 Create a new cross-interpreter channel and return a unique generated ID.");
2278
2279 static PyObject *
channel_destroy(PyObject * self,PyObject * args,PyObject * kwds)2280 channel_destroy(PyObject *self, PyObject *args, PyObject *kwds)
2281 {
2282 static char *kwlist[] = {"cid", NULL};
2283 int64_t cid;
2284 if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:channel_destroy", kwlist,
2285 channel_id_converter, &cid)) {
2286 return NULL;
2287 }
2288
2289 if (_channel_destroy(&_globals.channels, cid) != 0) {
2290 return NULL;
2291 }
2292 Py_RETURN_NONE;
2293 }
2294
2295 PyDoc_STRVAR(channel_destroy_doc,
2296 "channel_destroy(cid)\n\
2297 \n\
2298 Close and finalize the channel. Afterward attempts to use the channel\n\
2299 will behave as though it never existed.");
2300
2301 static PyObject *
channel_list_all(PyObject * self,PyObject * Py_UNUSED (ignored))2302 channel_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
2303 {
2304 int64_t count = 0;
2305 int64_t *cids = _channels_list_all(&_globals.channels, &count);
2306 if (cids == NULL) {
2307 if (count == 0) {
2308 return PyList_New(0);
2309 }
2310 return NULL;
2311 }
2312 PyObject *ids = PyList_New((Py_ssize_t)count);
2313 if (ids == NULL) {
2314 goto finally;
2315 }
2316 int64_t *cur = cids;
2317 for (int64_t i=0; i < count; cur++, i++) {
2318 PyObject *id = (PyObject *)newchannelid(&ChannelIDtype, *cur, 0,
2319 &_globals.channels, 0, 0);
2320 if (id == NULL) {
2321 Py_DECREF(ids);
2322 ids = NULL;
2323 break;
2324 }
2325 PyList_SET_ITEM(ids, i, id);
2326 }
2327
2328 finally:
2329 PyMem_Free(cids);
2330 return ids;
2331 }
2332
2333 PyDoc_STRVAR(channel_list_all_doc,
2334 "channel_list_all() -> [cid]\n\
2335 \n\
2336 Return the list of all IDs for active channels.");
2337
2338 static PyObject *
channel_list_interpreters(PyObject * self,PyObject * args,PyObject * kwds)2339 channel_list_interpreters(PyObject *self, PyObject *args, PyObject *kwds)
2340 {
2341 static char *kwlist[] = {"cid", "send", NULL};
2342 int64_t cid; /* Channel ID */
2343 int send = 0; /* Send or receive end? */
2344 int64_t id;
2345 PyObject *ids, *id_obj;
2346 PyInterpreterState *interp;
2347
2348 if (!PyArg_ParseTupleAndKeywords(
2349 args, kwds, "O&$p:channel_list_interpreters",
2350 kwlist, channel_id_converter, &cid, &send)) {
2351 return NULL;
2352 }
2353
2354 ids = PyList_New(0);
2355 if (ids == NULL) {
2356 goto except;
2357 }
2358
2359 interp = PyInterpreterState_Head();
2360 while (interp != NULL) {
2361 id = PyInterpreterState_GetID(interp);
2362 assert(id >= 0);
2363 int res = _channel_is_associated(&_globals.channels, cid, id, send);
2364 if (res < 0) {
2365 goto except;
2366 }
2367 if (res) {
2368 id_obj = _PyInterpreterState_GetIDObject(interp);
2369 if (id_obj == NULL) {
2370 goto except;
2371 }
2372 res = PyList_Insert(ids, 0, id_obj);
2373 Py_DECREF(id_obj);
2374 if (res < 0) {
2375 goto except;
2376 }
2377 }
2378 interp = PyInterpreterState_Next(interp);
2379 }
2380
2381 goto finally;
2382
2383 except:
2384 Py_XDECREF(ids);
2385 ids = NULL;
2386
2387 finally:
2388 return ids;
2389 }
2390
2391 PyDoc_STRVAR(channel_list_interpreters_doc,
2392 "channel_list_interpreters(cid, *, send) -> [id]\n\
2393 \n\
2394 Return the list of all interpreter IDs associated with an end of the channel.\n\
2395 \n\
2396 The 'send' argument should be a boolean indicating whether to use the send or\n\
2397 receive end.");
2398
2399
2400 static PyObject *
channel_send(PyObject * self,PyObject * args,PyObject * kwds)2401 channel_send(PyObject *self, PyObject *args, PyObject *kwds)
2402 {
2403 static char *kwlist[] = {"cid", "obj", NULL};
2404 int64_t cid;
2405 PyObject *obj;
2406 if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O:channel_send", kwlist,
2407 channel_id_converter, &cid, &obj)) {
2408 return NULL;
2409 }
2410
2411 if (_channel_send(&_globals.channels, cid, obj) != 0) {
2412 return NULL;
2413 }
2414 Py_RETURN_NONE;
2415 }
2416
2417 PyDoc_STRVAR(channel_send_doc,
2418 "channel_send(cid, obj)\n\
2419 \n\
2420 Add the object's data to the channel's queue.");
2421
2422 static PyObject *
channel_recv(PyObject * self,PyObject * args,PyObject * kwds)2423 channel_recv(PyObject *self, PyObject *args, PyObject *kwds)
2424 {
2425 static char *kwlist[] = {"cid", "default", NULL};
2426 int64_t cid;
2427 PyObject *dflt = NULL;
2428 if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&|O:channel_recv", kwlist,
2429 channel_id_converter, &cid, &dflt)) {
2430 return NULL;
2431 }
2432 Py_XINCREF(dflt);
2433
2434 PyObject *obj = _channel_recv(&_globals.channels, cid);
2435 if (obj != NULL) {
2436 Py_XDECREF(dflt);
2437 return obj;
2438 } else if (PyErr_Occurred()) {
2439 Py_XDECREF(dflt);
2440 return NULL;
2441 } else if (dflt != NULL) {
2442 return dflt;
2443 } else {
2444 PyErr_Format(ChannelEmptyError, "channel %" PRId64 " is empty", cid);
2445 return NULL;
2446 }
2447 }
2448
2449 PyDoc_STRVAR(channel_recv_doc,
2450 "channel_recv(cid, [default]) -> obj\n\
2451 \n\
2452 Return a new object from the data at the front of the channel's queue.\n\
2453 \n\
2454 If there is nothing to receive then raise ChannelEmptyError, unless\n\
2455 a default value is provided. In that case return it.");
2456
2457 static PyObject *
channel_close(PyObject * self,PyObject * args,PyObject * kwds)2458 channel_close(PyObject *self, PyObject *args, PyObject *kwds)
2459 {
2460 static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
2461 int64_t cid;
2462 int send = 0;
2463 int recv = 0;
2464 int force = 0;
2465 if (!PyArg_ParseTupleAndKeywords(args, kwds,
2466 "O&|$ppp:channel_close", kwlist,
2467 channel_id_converter, &cid, &send, &recv, &force)) {
2468 return NULL;
2469 }
2470
2471 if (_channel_close(&_globals.channels, cid, send-recv, force) != 0) {
2472 return NULL;
2473 }
2474 Py_RETURN_NONE;
2475 }
2476
2477 PyDoc_STRVAR(channel_close_doc,
2478 "channel_close(cid, *, send=None, recv=None, force=False)\n\
2479 \n\
2480 Close the channel for all interpreters.\n\
2481 \n\
2482 If the channel is empty then the keyword args are ignored and both\n\
2483 ends are immediately closed. Otherwise, if 'force' is True then\n\
2484 all queued items are released and both ends are immediately\n\
2485 closed.\n\
2486 \n\
2487 If the channel is not empty *and* 'force' is False then following\n\
2488 happens:\n\
2489 \n\
2490 * recv is True (regardless of send):\n\
2491 - raise ChannelNotEmptyError\n\
2492 * recv is None and send is None:\n\
2493 - raise ChannelNotEmptyError\n\
2494 * send is True and recv is not True:\n\
2495 - fully close the 'send' end\n\
2496 - close the 'recv' end to interpreters not already receiving\n\
2497 - fully close it once empty\n\
2498 \n\
2499 Closing an already closed channel results in a ChannelClosedError.\n\
2500 \n\
2501 Once the channel's ID has no more ref counts in any interpreter\n\
2502 the channel will be destroyed.");
2503
2504 static PyObject *
channel_release(PyObject * self,PyObject * args,PyObject * kwds)2505 channel_release(PyObject *self, PyObject *args, PyObject *kwds)
2506 {
2507 // Note that only the current interpreter is affected.
2508 static char *kwlist[] = {"cid", "send", "recv", "force", NULL};
2509 int64_t cid;
2510 int send = 0;
2511 int recv = 0;
2512 int force = 0;
2513 if (!PyArg_ParseTupleAndKeywords(args, kwds,
2514 "O&|$ppp:channel_release", kwlist,
2515 channel_id_converter, &cid, &send, &recv, &force)) {
2516 return NULL;
2517 }
2518 if (send == 0 && recv == 0) {
2519 send = 1;
2520 recv = 1;
2521 }
2522
2523 // XXX Handle force is True.
2524 // XXX Fix implicit release.
2525
2526 if (_channel_drop(&_globals.channels, cid, send, recv) != 0) {
2527 return NULL;
2528 }
2529 Py_RETURN_NONE;
2530 }
2531
2532 PyDoc_STRVAR(channel_release_doc,
2533 "channel_release(cid, *, send=None, recv=None, force=True)\n\
2534 \n\
2535 Close the channel for the current interpreter. 'send' and 'recv'\n\
2536 (bool) may be used to indicate the ends to close. By default both\n\
2537 ends are closed. Closing an already closed end is a noop.");
2538
2539 static PyObject *
channel__channel_id(PyObject * self,PyObject * args,PyObject * kwds)2540 channel__channel_id(PyObject *self, PyObject *args, PyObject *kwds)
2541 {
2542 return channelid_new(&ChannelIDtype, args, kwds);
2543 }
2544
2545 static PyMethodDef module_functions[] = {
2546 {"create", _PyCFunction_CAST(interp_create),
2547 METH_VARARGS | METH_KEYWORDS, create_doc},
2548 {"destroy", _PyCFunction_CAST(interp_destroy),
2549 METH_VARARGS | METH_KEYWORDS, destroy_doc},
2550 {"list_all", interp_list_all,
2551 METH_NOARGS, list_all_doc},
2552 {"get_current", interp_get_current,
2553 METH_NOARGS, get_current_doc},
2554 {"get_main", interp_get_main,
2555 METH_NOARGS, get_main_doc},
2556 {"is_running", _PyCFunction_CAST(interp_is_running),
2557 METH_VARARGS | METH_KEYWORDS, is_running_doc},
2558 {"run_string", _PyCFunction_CAST(interp_run_string),
2559 METH_VARARGS | METH_KEYWORDS, run_string_doc},
2560
2561 {"is_shareable", _PyCFunction_CAST(object_is_shareable),
2562 METH_VARARGS | METH_KEYWORDS, is_shareable_doc},
2563
2564 {"channel_create", channel_create,
2565 METH_NOARGS, channel_create_doc},
2566 {"channel_destroy", _PyCFunction_CAST(channel_destroy),
2567 METH_VARARGS | METH_KEYWORDS, channel_destroy_doc},
2568 {"channel_list_all", channel_list_all,
2569 METH_NOARGS, channel_list_all_doc},
2570 {"channel_list_interpreters", _PyCFunction_CAST(channel_list_interpreters),
2571 METH_VARARGS | METH_KEYWORDS, channel_list_interpreters_doc},
2572 {"channel_send", _PyCFunction_CAST(channel_send),
2573 METH_VARARGS | METH_KEYWORDS, channel_send_doc},
2574 {"channel_recv", _PyCFunction_CAST(channel_recv),
2575 METH_VARARGS | METH_KEYWORDS, channel_recv_doc},
2576 {"channel_close", _PyCFunction_CAST(channel_close),
2577 METH_VARARGS | METH_KEYWORDS, channel_close_doc},
2578 {"channel_release", _PyCFunction_CAST(channel_release),
2579 METH_VARARGS | METH_KEYWORDS, channel_release_doc},
2580 {"_channel_id", _PyCFunction_CAST(channel__channel_id),
2581 METH_VARARGS | METH_KEYWORDS, NULL},
2582
2583 {NULL, NULL} /* sentinel */
2584 };
2585
2586
2587 /* initialization function */
2588
2589 PyDoc_STRVAR(module_doc,
2590 "This module provides primitive operations to manage Python interpreters.\n\
2591 The 'interpreters' module provides a more convenient interface.");
2592
2593 static struct PyModuleDef interpretersmodule = {
2594 PyModuleDef_HEAD_INIT,
2595 "_xxsubinterpreters", /* m_name */
2596 module_doc, /* m_doc */
2597 -1, /* m_size */
2598 module_functions, /* m_methods */
2599 NULL, /* m_slots */
2600 NULL, /* m_traverse */
2601 NULL, /* m_clear */
2602 NULL /* m_free */
2603 };
2604
2605
2606 PyMODINIT_FUNC
PyInit__xxsubinterpreters(void)2607 PyInit__xxsubinterpreters(void)
2608 {
2609 if (_init_globals() != 0) {
2610 return NULL;
2611 }
2612
2613 /* Initialize types */
2614 if (PyType_Ready(&ChannelIDtype) != 0) {
2615 return NULL;
2616 }
2617
2618 /* Create the module */
2619 PyObject *module = PyModule_Create(&interpretersmodule);
2620 if (module == NULL) {
2621 return NULL;
2622 }
2623
2624 /* Add exception types */
2625 PyObject *ns = PyModule_GetDict(module); // borrowed
2626 if (interp_exceptions_init(ns) != 0) {
2627 return NULL;
2628 }
2629 if (channel_exceptions_init(ns) != 0) {
2630 return NULL;
2631 }
2632
2633 /* Add other types */
2634 Py_INCREF(&ChannelIDtype);
2635 if (PyDict_SetItemString(ns, "ChannelID", (PyObject *)&ChannelIDtype) != 0) {
2636 return NULL;
2637 }
2638 Py_INCREF(&_PyInterpreterID_Type);
2639 if (PyDict_SetItemString(ns, "InterpreterID", (PyObject *)&_PyInterpreterID_Type) != 0) {
2640 return NULL;
2641 }
2642
2643 if (_PyCrossInterpreterData_RegisterClass(&ChannelIDtype, _channelid_shared)) {
2644 return NULL;
2645 }
2646
2647 return module;
2648 }
2649