1 /*
2  * Copyright (c) 2015 Carlos Pizano-Uribe  [email protected]
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining
5  * a copy of this software and associated documentation files
6  * (the "Software"), to deal in the Software without restriction,
7  * including without limitation the rights to use, copy, modify, merge,
8  * publish, distribute, sublicense, and/or sell copies of the Software,
9  * and to permit persons to whom the Software is furnished to do so,
10  * subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be
13  * included in all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
18  * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
19  * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
20  * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
21  * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
22  */
23 
24 #include <debug.h>
25 #include <err.h>
26 #include <rand.h>
27 #include <string.h>
28 #include <trace.h>
29 
30 #include <kernel/event.h>
31 #include <kernel/port.h>
32 #include <kernel/thread.h>
33 
34 #include <platform.h>
35 
36 #define LOCAL_TRACE 0
37 
38 void *context1 = (void *) 0x53;
39 
dump_port_result(const port_result_t * result)40 static void dump_port_result(const port_result_t *result)
41 {
42     const port_packet_t *p = &result->packet;
43     LTRACEF("[%02x %02x %02x %02x %02x %02x %02x %02x]\n",
44             p->value[0], p->value[1], p->value[2], p->value[3],
45             p->value[4], p->value[5], p->value[6], p->value[7]);
46 }
47 
single_thread_basic(void)48 static int single_thread_basic(void)
49 {
50     port_t w_port;
51     status_t st = port_create("sh_prt1", PORT_MODE_UNICAST, &w_port);
52     if (st < 0) {
53         printf("could not create port, status = %d\n", st);
54         return __LINE__;
55     }
56 
57     port_t r_port;
58     st = port_open("sh_prt0", context1, &r_port);
59     if (st != ERR_NOT_FOUND) {
60         printf("expected not to find port, status = %d\n", st);
61         return __LINE__;
62     }
63 
64     st = port_open("sh_prt1", context1, &r_port);
65     if (st < 0) {
66         printf("could not open port, status = %d\n", st);
67         return __LINE__;
68     }
69 
70     port_packet_t packet[3] = {
71         {{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}},
72         {{0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10, 0x11}},
73         {{0x33, 0x66, 0x99, 0xcc, 0x33, 0x66, 0x99, 0xcc}},
74     };
75 
76     st = port_write(w_port, &packet[0], 1);
77     if (st < 0) {
78         printf("could not write port, status = %d\n", st);
79         return __LINE__;
80     }
81 
82     printf("reading from port:\n");
83 
84     port_result_t res = {0};
85 
86     st = port_read(r_port, 0, &res);
87     if (st < 0) {
88         printf("could not read port, status = %d\n", st);
89         return __LINE__;
90     }
91     if (res.ctx != context1) {
92         printf("bad context! = %p\n", res.ctx);
93         return __LINE__;
94     }
95 
96     st = port_read(r_port, 0, &res);
97     if (st != ERR_TIMED_OUT) {
98         printf("expected timeout, status = %d\n", st);
99         return __LINE__;
100     }
101 
102     st = port_write(w_port, &packet[1], 1);
103     if (st < 0) {
104         printf("could not write port, status = %d\n", st);
105         return __LINE__;
106     }
107 
108     st = port_write(w_port, &packet[0], 1);
109     if (st < 0) {
110         printf("could not write port, status = %d\n", st);
111         return __LINE__;
112     }
113 
114     st = port_write(w_port, &packet[2], 1);
115     if (st < 0) {
116         printf("could not write port, status = %d\n", st);
117         return __LINE__;
118     }
119 
120     int expected_count = 3;
121     while (true) {
122         st = port_read(r_port, 0, &res);
123         if (st < 0)
124             break;
125         dump_port_result(&res);
126         --expected_count;
127     }
128 
129     if (expected_count != 0) {
130         printf("invalid read count = %d\n", expected_count);
131         return __LINE__;
132     }
133 
134     printf("\n");
135 
136     // port should be empty. should be able to write 8 packets.
137     expected_count = 8;
138     while (true) {
139         st = port_write(w_port, &packet[1], 1);
140         if (st < 0)
141             break;
142         --expected_count;
143         st = port_write(w_port, &packet[2], 1);
144         if (st < 0)
145             break;
146         --expected_count;
147     }
148 
149     if (expected_count != 0) {
150         printf("invalid write count = %d\n", expected_count);
151         return __LINE__;
152     }
153 
154     // tod(cpu) fix this possibly wrong error.
155     if (st != ERR_PARTIAL_WRITE) {
156         printf("expected buffer error, status =%d\n", st);
157         return __LINE__;
158     }
159 
160     // read 3 packets.
161     for (int ix = 0; ix != 3; ++ix) {
162         st = port_read(r_port, 0, &res);
163         if (st < 0) {
164             printf("could not read port, status = %d\n", st);
165             return __LINE__;
166         }
167     }
168 
169     // there are 5 packets, now we add another 3.
170     st = port_write(w_port, packet, 3);
171     if (st < 0) {
172         printf("could not write port, status = %d\n", st);
173         return __LINE__;
174     }
175 
176     expected_count = 8;
177     while (true) {
178         st = port_read(r_port, 0, &res);
179         if (st < 0)
180             break;
181         dump_port_result(&res);
182         --expected_count;
183     }
184 
185     if (expected_count != 0) {
186         printf("invalid read count = %d\n", expected_count);
187         return __LINE__;
188     }
189 
190     // attempt to use the wrong port.
191     st = port_write(r_port, &packet[1], 1);
192     if (st !=  ERR_BAD_HANDLE) {
193         printf("expected bad handle error, status = %d\n", st);
194         return __LINE__;
195     }
196 
197     st = port_read(w_port, 0, &res);
198     if (st !=  ERR_BAD_HANDLE) {
199         printf("expected bad handle error, status = %d\n", st);
200         return __LINE__;
201     }
202 
203     st = port_close(r_port);
204     if (st < 0) {
205         printf("could not close read port, status = %d\n", st);
206         return __LINE__;
207     }
208 
209     st = port_close(w_port);
210     if (st < 0) {
211         printf("could not close write port, status = %d\n", st);
212         return __LINE__;
213     }
214 
215     st = port_close(r_port);
216     if (st != ERR_BAD_HANDLE) {
217         printf("expected bad handle error, status = %d\n", st);
218         return __LINE__;
219     }
220 
221     st = port_close(w_port);
222     if (st != ERR_BAD_HANDLE) {
223         printf("expected bad handle error, status = %d\n", st);
224         return __LINE__;
225     }
226 
227     st = port_destroy(w_port);
228     if (st < 0) {
229         printf("could not destroy port, status = %d\n", st);
230         return __LINE__;
231     }
232 
233     printf("single_thread_basic : ok\n");
234     return 0;
235 }
236 
ping_pong_thread(void * arg)237 static int ping_pong_thread(void *arg)
238 {
239     port_t r_port;
240     status_t st = port_open("ping_port", NULL, &r_port);
241     if (st < 0) {
242         printf("thread: could not open port, status = %d\n", st);
243         return __LINE__;
244     }
245 
246     bool should_dispose_pong_port = true;
247     port_t w_port;
248     st = port_create("pong_port", PORT_MODE_UNICAST, &w_port);
249     if (st == ERR_ALREADY_EXISTS) {
250         // won the race to create the port.
251         should_dispose_pong_port = false;
252     } else if (st < 0) {
253         printf("thread: could not open port, status = %d\n", st);
254         return __LINE__;
255     }
256 
257     port_result_t pr;
258 
259     // the loop is read-mutate-write until the write port
260     // is closed by the master thread.
261     while (true) {
262         st = port_read(r_port, INFINITE_TIME, &pr);
263 
264         if (st == ERR_CANCELLED) {
265             break;
266         } else if (st < 0) {
267             printf("thread: could not read port, status = %d\n", st);
268             return __LINE__;
269         }
270 
271         pr.packet.value[0]++;
272         pr.packet.value[5]--;
273 
274         st = port_write(w_port, &pr.packet, 1);
275         if (st < 0) {
276             printf("thread: could not write port, status = %d\n", st);
277             return __LINE__;
278         }
279     }
280 
281     port_close(r_port);
282 
283     if (should_dispose_pong_port) {
284         port_close(w_port);
285         port_destroy(w_port);
286     }
287 
288     return 0;
289 
290 bail:
291     return __LINE__;
292 }
293 
294 
two_threads_basic(void)295 int two_threads_basic(void)
296 {
297     port_t w_port;
298     status_t st = port_create("ping_port", PORT_MODE_BROADCAST, &w_port);
299     if (st < 0) {
300         printf("could not create port, status = %d\n", st);
301         return __LINE__;
302     }
303 
304     thread_t *t1 = thread_create(
305                        "worker1", &ping_pong_thread, NULL, DEFAULT_PRIORITY, DEFAULT_STACK_SIZE);
306     thread_t *t2 = thread_create(
307                        "worker2", &ping_pong_thread, NULL, DEFAULT_PRIORITY, DEFAULT_STACK_SIZE);
308     thread_resume(t1);
309     thread_resume(t2);
310 
311     // wait for the pong port to be created, the two threads race to do it.
312     port_t r_port;
313     while (true) {
314         status_t st = port_open("pong_port", NULL, &r_port);
315         if (st == NO_ERROR) {
316             break;
317         } else if (st == ERR_NOT_FOUND) {
318             thread_sleep(100);
319         } else {
320             printf("could not open port, status = %d\n", st);
321             return __LINE__;
322         }
323     }
324 
325     // We have two threads listening to the ping port. Which both reply
326     // on the pong port, so we get two packets in per packet out.
327     const int passes = 256;
328     printf("two_threads_basic test, %d passes\n", passes);
329 
330     port_packet_t packet_out = {{0xaf, 0x77, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05}};
331 
332     port_result_t pr;
333     for (int ix = 0; ix != passes; ++ix) {
334         const size_t count = 1 + ((unsigned int)rand() % 3);
335 
336         for (size_t jx = 0; jx != count; ++jx) {
337             st = port_write(w_port, &packet_out, 1);
338             if (st < 0) {
339                 printf("could not write port, status = %d\n", st);
340                 return __LINE__;
341             }
342         }
343 
344         packet_out.value[0]++;
345         packet_out.value[5]--;
346 
347         for (size_t jx = 0; jx != count * 2; ++jx) {
348             st = port_read(r_port, INFINITE_TIME, &pr);
349             if (st < 0) {
350                 printf("could not read port, status = %d\n", st);
351                 return __LINE__;
352             }
353 
354             if ((pr.packet.value[0] != packet_out.value[0]) ||
355                     (pr.packet.value[5] != packet_out.value[5])) {
356                 printf("unexpected data in packet, loop %d", ix);
357                 return __LINE__;
358             }
359         }
360     }
361 
362     thread_sleep(100);
363 
364     // there should be no more packets to read.
365     st = port_read(r_port, 0, &pr);
366     if (st != ERR_TIMED_OUT) {
367         printf("unexpected packet, status = %d\n", st);
368         return __LINE__;
369     }
370 
371     printf("two_threads_basic master shutdown\n");
372 
373     st = port_close(r_port);
374     if (st < 0) {
375         printf("could not close port, status = %d\n", st);
376         return __LINE__;
377     }
378 
379     st = port_close(w_port);
380     if (st < 0) {
381         printf("could not close port, status = %d\n", st);
382         return __LINE__;
383     }
384 
385     st = port_destroy(w_port);
386     if (st < 0) {
387         printf("could not destroy port, status = %d\n", st);
388         return __LINE__;
389     }
390 
391     int retcode = -1;
392     thread_join(t1, &retcode, INFINITE_TIME);
393     if (retcode)
394         goto fail;
395 
396     thread_join(t2,  &retcode, INFINITE_TIME);
397     if (retcode)
398         goto fail;
399 
400     return 0;
401 
402 fail:
403     printf("child thread exited with %d\n", retcode);
404     return __LINE__;
405 }
406 
407 #define CMD_PORT_CTX ((void*) 0x77)
408 #define TS1_PORT_CTX ((void*) 0x11)
409 #define TS2_PORT_CTX ((void*) 0x12)
410 
411 typedef enum {
412     ADD_PORT,
413     QUIT
414 } action_t;
415 
416 typedef struct {
417     action_t what;
418     port_t port;
419 } watcher_cmd;
420 
send_watcher_cmd(port_t cmd_port,action_t action,port_t port)421 status_t send_watcher_cmd(port_t cmd_port, action_t action, port_t port)
422 {
423     watcher_cmd cmd  = {action, port};
424     return port_write(cmd_port, ((port_packet_t *) &cmd), 1);;
425 }
426 
group_watcher_thread(void * arg)427 static int group_watcher_thread(void *arg)
428 {
429     port_t watched[8] = {0};
430     status_t st = port_open("grp_ctrl", CMD_PORT_CTX, &watched[0]);
431     if (st < 0) {
432         printf("could not open port, status = %d\n", st);
433         return __LINE__;
434     }
435 
436     size_t count = 1;
437     port_t group;
438     int ctx_count = -1;
439 
440     while (true) {
441         st = port_group(watched, count, &group);
442         if (st < 0) {
443             printf("could not make group, status = %d\n", st);
444             return __LINE__;
445         }
446 
447         port_result_t pr;
448         while (true) {
449             st = port_read(group, INFINITE_TIME, &pr);
450             if (st < 0) {
451                 printf("could not read port, status = %d\n", st);
452                 return __LINE__;
453             }
454 
455             if (pr.ctx == CMD_PORT_CTX) {
456                 break;
457             } else if (pr.ctx == TS1_PORT_CTX) {
458                 ctx_count += 1;
459             } else if (pr.ctx == TS2_PORT_CTX) {
460                 ctx_count += 2;
461             } else {
462                 printf("unknown context %p\n", pr.ctx);
463                 return __LINE__;
464             }
465         }
466 
467         // Either adding a port or exiting; either way close the
468         // existing group port and create a new one if needed
469         // at the top of the loop.
470 
471         port_close(group);
472         watcher_cmd *wc = (watcher_cmd *) &pr.packet;
473 
474         if (wc->what == ADD_PORT) {
475             watched[count++] = wc->port;
476         }  else if (wc->what == QUIT) {
477             break;
478         } else {
479             printf("unknown command %d\n", wc->what);
480             return __LINE__;
481         }
482     }
483 
484     if (ctx_count !=  2) {
485         printf("unexpected context count %d", ctx_count);
486         return __LINE__;
487     }
488 
489     printf("group watcher shutdown\n");
490 
491     for (size_t ix = 0; ix != count; ++ix) {
492         st = port_close(watched[ix]);
493         if (st < 0) {
494             printf("failed to close read port, status = %d\n", st);
495             return __LINE__;
496         }
497     }
498 
499     return 0;
500 }
501 
make_port_pair(const char * name,void * ctx,port_t * write,port_t * read)502 static status_t make_port_pair(const char *name, void *ctx, port_t *write, port_t *read)
503 {
504     status_t st = port_create(name, PORT_MODE_UNICAST, write);
505     if (st < 0)
506         return st;
507     return port_open(name,ctx, read);
508 }
509 
group_basic(void)510 int group_basic(void)
511 {
512     // we spin a thread that connects to a well known port, then we
513     // send two ports that it will add to a group port.
514     port_t cmd_port;
515     status_t st = port_create("grp_ctrl", PORT_MODE_UNICAST, &cmd_port);
516     if (st < 0 ) {
517         printf("could not create port, status = %d\n", st);
518         return __LINE__;
519     }
520 
521     thread_t *wt = thread_create(
522                        "g_watcher", &group_watcher_thread, NULL, DEFAULT_PRIORITY, DEFAULT_STACK_SIZE);
523     thread_resume(wt);
524 
525     port_t w_test_port1, r_test_port1;
526     st = make_port_pair("tst_port1", TS1_PORT_CTX, &w_test_port1, &r_test_port1);
527     if (st < 0)
528         return __LINE__;
529 
530     port_t w_test_port2, r_test_port2;
531     st = make_port_pair("tst_port2", TS2_PORT_CTX, &w_test_port2, &r_test_port2);
532     if (st < 0)
533         return __LINE__;
534 
535     st = send_watcher_cmd(cmd_port, ADD_PORT, r_test_port1);
536     if (st < 0)
537         return __LINE__;
538 
539     st = send_watcher_cmd(cmd_port, ADD_PORT, r_test_port2);
540     if (st < 0)
541         return __LINE__;
542 
543     thread_sleep(50);
544 
545     port_packet_t pp = {{0}};
546     st = port_write(w_test_port1, &pp, 1);
547     if (st < 0)
548         return __LINE__;
549 
550     st = port_write(w_test_port2, &pp, 1);
551     if (st < 0)
552         return __LINE__;
553 
554     st = send_watcher_cmd(cmd_port, QUIT, 0);
555     if (st < 0)
556         return __LINE__;
557 
558     int retcode = -1;
559     thread_join(wt, &retcode, INFINITE_TIME);
560     if (retcode) {
561         printf("child thread exited with %d\n", retcode);
562         return __LINE__;
563     }
564 
565     st = port_close(w_test_port1);
566     if (st < 0)
567         return __LINE__;
568     st = port_close(w_test_port2);
569     if (st < 0)
570         return __LINE__;
571     st = port_close(cmd_port);
572     if (st < 0)
573         return __LINE__;
574     st = port_destroy(w_test_port1);
575     if (st < 0)
576         return __LINE__;
577     st = port_destroy(w_test_port2);
578     if (st < 0)
579         return __LINE__;
580     st = port_destroy(cmd_port);
581     if (st < 0)
582         return __LINE__;
583 
584     return 0;
585 }
586 
group_dynamic(void)587 int group_dynamic(void)
588 {
589     status_t st;
590 
591     port_t w_test_port1, r_test_port1;
592     st = make_port_pair("tst_port1", TS1_PORT_CTX, &w_test_port1, &r_test_port1);
593     if (st < 0)
594         return __LINE__;
595 
596     port_t w_test_port2, r_test_port2;
597     st = make_port_pair("tst_port2", TS2_PORT_CTX, &w_test_port2, &r_test_port2);
598     if (st < 0)
599         return __LINE__;
600 
601     port_t pg;
602     st = port_group(&r_test_port1, 1, &pg);
603     if (st < 0)
604         return __LINE__;
605 
606     port_packet_t pkt = { { 0 } };
607     st = port_write(w_test_port2, &pkt, 1);
608     if (st < 0)
609         return __LINE__;
610 
611     port_result_t rslt;
612     st = port_read(pg, 0, &rslt);
613     if (st != ERR_TIMED_OUT)
614         return __LINE__;
615 
616     // Attach the port that has been written to to the port group and ensure
617     // that we can read from it.
618     st = port_group_add(pg, r_test_port2);
619     if (st < 0)
620         return __LINE__;
621 
622     st = port_read(pg, 0, &rslt);
623     if (st < 0)
624         return __LINE__;
625 
626     // Write some data to a port then remove it from the port group and ensure
627     // that we can't read from it.
628     st = port_write(w_test_port1, &pkt, 1);
629     if (st < 0)
630         return __LINE__;
631 
632     st = port_group_remove(pg, r_test_port1);
633     if (st < 0)
634         return __LINE__;
635 
636     st = port_read(pg, 0, &rslt);
637     if (st != ERR_TIMED_OUT)
638         return __LINE__;
639 
640     st = port_close(w_test_port1);
641     if (st < 0)
642         return __LINE__;
643     st = port_close(w_test_port2);
644     if (st < 0)
645         return __LINE__;
646     st = port_destroy(w_test_port1);
647     if (st < 0)
648         return __LINE__;
649     st = port_destroy(w_test_port2);
650     if (st < 0)
651         return __LINE__;
652 
653     return 0;
654 }
655 
656 event_t group_waiting_sync_evt;
657 
receive_thread(void * arg)658 static int receive_thread(void *arg)
659 {
660     port_t pg = (port_t)arg;
661 
662     // Try to read from an empty port group. When the other thread adds a port
663     // to this port group, we should wake up and
664     port_result_t rslt;
665     status_t st = port_read(pg, 500, &rslt);
666     if (st == ERR_TIMED_OUT)
667         return __LINE__;
668 
669     event_signal(&group_waiting_sync_evt, true);
670 
671     return 0;
672 }
673 
674 /* Test the edge case where a read port with data available is added to a port
675  * group that has a read-blocked receiver.
676  */
group_waiting(void)677 int group_waiting(void)
678 {
679     status_t st;
680 
681     event_init(&group_waiting_sync_evt, false, EVENT_FLAG_AUTOUNSIGNAL);
682 
683     port_t w_test_port1, r_test_port1;
684     st = make_port_pair("tst_port1", TS1_PORT_CTX, &w_test_port1, &r_test_port1);
685     if (st < 0)
686         return __LINE__;
687 
688     // Write something to this port group that currently has no receivers.
689     port_packet_t pkt = { { 0 } };
690     st = port_write(w_test_port1, &pkt, 1);
691     if (st < 0)
692         return __LINE__;
693 
694     // Create an empty port group.
695     port_t pg;
696     st = port_group(NULL, 0, &pg);
697     if (st < 0)
698         return __LINE__;
699 
700 
701     thread_t *t1 = thread_create(
702         "receiver",
703         &receive_thread,
704         (void *)pg,
705         DEFAULT_PRIORITY,
706         DEFAULT_STACK_SIZE
707     );
708 
709     thread_resume(t1);
710 
711     // Wait for the other thread to block on the read.
712     thread_sleep(20);
713 
714     // Adding a port that has data available to the port group should wake any
715     // threads waiting on that port group.
716     port_group_add(pg, r_test_port1);
717 
718     if (event_wait_timeout(&group_waiting_sync_evt, 500) != NO_ERROR)
719         return __LINE__;
720 
721     st = port_close(w_test_port1);
722     if (st < 0)
723         return __LINE__;
724 
725     st = port_destroy(w_test_port1);
726     if (st < 0)
727         return __LINE__;
728 
729     return 0;
730 }
731 
732 #define RUN_TEST(t)  result = t(); if (result) goto fail
733 
port_tests(void)734 int port_tests(void)
735 {
736     int result;
737     int count = 3;
738     while (count--) {
739         RUN_TEST(single_thread_basic);
740         RUN_TEST(two_threads_basic);
741         RUN_TEST(group_basic);
742         RUN_TEST(group_dynamic);
743     }
744 
745     printf("all tests passed\n");
746     return 0;
747 fail:
748     printf("test failed at line %d\n", result);
749     return 1;
750 }
751 
752 #undef RUN_TEST
753