1 /*
2 * Copyright (c) 2015 Carlos Pizano-Uribe [email protected]
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining
5 * a copy of this software and associated documentation files
6 * (the "Software"), to deal in the Software without restriction,
7 * including without limitation the rights to use, copy, modify, merge,
8 * publish, distribute, sublicense, and/or sell copies of the Software,
9 * and to permit persons to whom the Software is furnished to do so,
10 * subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be
13 * included in all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
18 * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
19 * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
20 * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
21 * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
22 */
23
24 #include <debug.h>
25 #include <err.h>
26 #include <rand.h>
27 #include <string.h>
28 #include <trace.h>
29
30 #include <kernel/event.h>
31 #include <kernel/port.h>
32 #include <kernel/thread.h>
33
34 #include <platform.h>
35
36 #define LOCAL_TRACE 0
37
38 void *context1 = (void *) 0x53;
39
dump_port_result(const port_result_t * result)40 static void dump_port_result(const port_result_t *result)
41 {
42 const port_packet_t *p = &result->packet;
43 LTRACEF("[%02x %02x %02x %02x %02x %02x %02x %02x]\n",
44 p->value[0], p->value[1], p->value[2], p->value[3],
45 p->value[4], p->value[5], p->value[6], p->value[7]);
46 }
47
single_thread_basic(void)48 static int single_thread_basic(void)
49 {
50 port_t w_port;
51 status_t st = port_create("sh_prt1", PORT_MODE_UNICAST, &w_port);
52 if (st < 0) {
53 printf("could not create port, status = %d\n", st);
54 return __LINE__;
55 }
56
57 port_t r_port;
58 st = port_open("sh_prt0", context1, &r_port);
59 if (st != ERR_NOT_FOUND) {
60 printf("expected not to find port, status = %d\n", st);
61 return __LINE__;
62 }
63
64 st = port_open("sh_prt1", context1, &r_port);
65 if (st < 0) {
66 printf("could not open port, status = %d\n", st);
67 return __LINE__;
68 }
69
70 port_packet_t packet[3] = {
71 {{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}},
72 {{0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 0x10, 0x11}},
73 {{0x33, 0x66, 0x99, 0xcc, 0x33, 0x66, 0x99, 0xcc}},
74 };
75
76 st = port_write(w_port, &packet[0], 1);
77 if (st < 0) {
78 printf("could not write port, status = %d\n", st);
79 return __LINE__;
80 }
81
82 printf("reading from port:\n");
83
84 port_result_t res = {0};
85
86 st = port_read(r_port, 0, &res);
87 if (st < 0) {
88 printf("could not read port, status = %d\n", st);
89 return __LINE__;
90 }
91 if (res.ctx != context1) {
92 printf("bad context! = %p\n", res.ctx);
93 return __LINE__;
94 }
95
96 st = port_read(r_port, 0, &res);
97 if (st != ERR_TIMED_OUT) {
98 printf("expected timeout, status = %d\n", st);
99 return __LINE__;
100 }
101
102 st = port_write(w_port, &packet[1], 1);
103 if (st < 0) {
104 printf("could not write port, status = %d\n", st);
105 return __LINE__;
106 }
107
108 st = port_write(w_port, &packet[0], 1);
109 if (st < 0) {
110 printf("could not write port, status = %d\n", st);
111 return __LINE__;
112 }
113
114 st = port_write(w_port, &packet[2], 1);
115 if (st < 0) {
116 printf("could not write port, status = %d\n", st);
117 return __LINE__;
118 }
119
120 int expected_count = 3;
121 while (true) {
122 st = port_read(r_port, 0, &res);
123 if (st < 0)
124 break;
125 dump_port_result(&res);
126 --expected_count;
127 }
128
129 if (expected_count != 0) {
130 printf("invalid read count = %d\n", expected_count);
131 return __LINE__;
132 }
133
134 printf("\n");
135
136 // port should be empty. should be able to write 8 packets.
137 expected_count = 8;
138 while (true) {
139 st = port_write(w_port, &packet[1], 1);
140 if (st < 0)
141 break;
142 --expected_count;
143 st = port_write(w_port, &packet[2], 1);
144 if (st < 0)
145 break;
146 --expected_count;
147 }
148
149 if (expected_count != 0) {
150 printf("invalid write count = %d\n", expected_count);
151 return __LINE__;
152 }
153
154 // tod(cpu) fix this possibly wrong error.
155 if (st != ERR_PARTIAL_WRITE) {
156 printf("expected buffer error, status =%d\n", st);
157 return __LINE__;
158 }
159
160 // read 3 packets.
161 for (int ix = 0; ix != 3; ++ix) {
162 st = port_read(r_port, 0, &res);
163 if (st < 0) {
164 printf("could not read port, status = %d\n", st);
165 return __LINE__;
166 }
167 }
168
169 // there are 5 packets, now we add another 3.
170 st = port_write(w_port, packet, 3);
171 if (st < 0) {
172 printf("could not write port, status = %d\n", st);
173 return __LINE__;
174 }
175
176 expected_count = 8;
177 while (true) {
178 st = port_read(r_port, 0, &res);
179 if (st < 0)
180 break;
181 dump_port_result(&res);
182 --expected_count;
183 }
184
185 if (expected_count != 0) {
186 printf("invalid read count = %d\n", expected_count);
187 return __LINE__;
188 }
189
190 // attempt to use the wrong port.
191 st = port_write(r_port, &packet[1], 1);
192 if (st != ERR_BAD_HANDLE) {
193 printf("expected bad handle error, status = %d\n", st);
194 return __LINE__;
195 }
196
197 st = port_read(w_port, 0, &res);
198 if (st != ERR_BAD_HANDLE) {
199 printf("expected bad handle error, status = %d\n", st);
200 return __LINE__;
201 }
202
203 st = port_close(r_port);
204 if (st < 0) {
205 printf("could not close read port, status = %d\n", st);
206 return __LINE__;
207 }
208
209 st = port_close(w_port);
210 if (st < 0) {
211 printf("could not close write port, status = %d\n", st);
212 return __LINE__;
213 }
214
215 st = port_close(r_port);
216 if (st != ERR_BAD_HANDLE) {
217 printf("expected bad handle error, status = %d\n", st);
218 return __LINE__;
219 }
220
221 st = port_close(w_port);
222 if (st != ERR_BAD_HANDLE) {
223 printf("expected bad handle error, status = %d\n", st);
224 return __LINE__;
225 }
226
227 st = port_destroy(w_port);
228 if (st < 0) {
229 printf("could not destroy port, status = %d\n", st);
230 return __LINE__;
231 }
232
233 printf("single_thread_basic : ok\n");
234 return 0;
235 }
236
ping_pong_thread(void * arg)237 static int ping_pong_thread(void *arg)
238 {
239 port_t r_port;
240 status_t st = port_open("ping_port", NULL, &r_port);
241 if (st < 0) {
242 printf("thread: could not open port, status = %d\n", st);
243 return __LINE__;
244 }
245
246 bool should_dispose_pong_port = true;
247 port_t w_port;
248 st = port_create("pong_port", PORT_MODE_UNICAST, &w_port);
249 if (st == ERR_ALREADY_EXISTS) {
250 // won the race to create the port.
251 should_dispose_pong_port = false;
252 } else if (st < 0) {
253 printf("thread: could not open port, status = %d\n", st);
254 return __LINE__;
255 }
256
257 port_result_t pr;
258
259 // the loop is read-mutate-write until the write port
260 // is closed by the master thread.
261 while (true) {
262 st = port_read(r_port, INFINITE_TIME, &pr);
263
264 if (st == ERR_CANCELLED) {
265 break;
266 } else if (st < 0) {
267 printf("thread: could not read port, status = %d\n", st);
268 return __LINE__;
269 }
270
271 pr.packet.value[0]++;
272 pr.packet.value[5]--;
273
274 st = port_write(w_port, &pr.packet, 1);
275 if (st < 0) {
276 printf("thread: could not write port, status = %d\n", st);
277 return __LINE__;
278 }
279 }
280
281 port_close(r_port);
282
283 if (should_dispose_pong_port) {
284 port_close(w_port);
285 port_destroy(w_port);
286 }
287
288 return 0;
289
290 bail:
291 return __LINE__;
292 }
293
294
two_threads_basic(void)295 int two_threads_basic(void)
296 {
297 port_t w_port;
298 status_t st = port_create("ping_port", PORT_MODE_BROADCAST, &w_port);
299 if (st < 0) {
300 printf("could not create port, status = %d\n", st);
301 return __LINE__;
302 }
303
304 thread_t *t1 = thread_create(
305 "worker1", &ping_pong_thread, NULL, DEFAULT_PRIORITY, DEFAULT_STACK_SIZE);
306 thread_t *t2 = thread_create(
307 "worker2", &ping_pong_thread, NULL, DEFAULT_PRIORITY, DEFAULT_STACK_SIZE);
308 thread_resume(t1);
309 thread_resume(t2);
310
311 // wait for the pong port to be created, the two threads race to do it.
312 port_t r_port;
313 while (true) {
314 status_t st = port_open("pong_port", NULL, &r_port);
315 if (st == NO_ERROR) {
316 break;
317 } else if (st == ERR_NOT_FOUND) {
318 thread_sleep(100);
319 } else {
320 printf("could not open port, status = %d\n", st);
321 return __LINE__;
322 }
323 }
324
325 // We have two threads listening to the ping port. Which both reply
326 // on the pong port, so we get two packets in per packet out.
327 const int passes = 256;
328 printf("two_threads_basic test, %d passes\n", passes);
329
330 port_packet_t packet_out = {{0xaf, 0x77, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05}};
331
332 port_result_t pr;
333 for (int ix = 0; ix != passes; ++ix) {
334 const size_t count = 1 + ((unsigned int)rand() % 3);
335
336 for (size_t jx = 0; jx != count; ++jx) {
337 st = port_write(w_port, &packet_out, 1);
338 if (st < 0) {
339 printf("could not write port, status = %d\n", st);
340 return __LINE__;
341 }
342 }
343
344 packet_out.value[0]++;
345 packet_out.value[5]--;
346
347 for (size_t jx = 0; jx != count * 2; ++jx) {
348 st = port_read(r_port, INFINITE_TIME, &pr);
349 if (st < 0) {
350 printf("could not read port, status = %d\n", st);
351 return __LINE__;
352 }
353
354 if ((pr.packet.value[0] != packet_out.value[0]) ||
355 (pr.packet.value[5] != packet_out.value[5])) {
356 printf("unexpected data in packet, loop %d", ix);
357 return __LINE__;
358 }
359 }
360 }
361
362 thread_sleep(100);
363
364 // there should be no more packets to read.
365 st = port_read(r_port, 0, &pr);
366 if (st != ERR_TIMED_OUT) {
367 printf("unexpected packet, status = %d\n", st);
368 return __LINE__;
369 }
370
371 printf("two_threads_basic master shutdown\n");
372
373 st = port_close(r_port);
374 if (st < 0) {
375 printf("could not close port, status = %d\n", st);
376 return __LINE__;
377 }
378
379 st = port_close(w_port);
380 if (st < 0) {
381 printf("could not close port, status = %d\n", st);
382 return __LINE__;
383 }
384
385 st = port_destroy(w_port);
386 if (st < 0) {
387 printf("could not destroy port, status = %d\n", st);
388 return __LINE__;
389 }
390
391 int retcode = -1;
392 thread_join(t1, &retcode, INFINITE_TIME);
393 if (retcode)
394 goto fail;
395
396 thread_join(t2, &retcode, INFINITE_TIME);
397 if (retcode)
398 goto fail;
399
400 return 0;
401
402 fail:
403 printf("child thread exited with %d\n", retcode);
404 return __LINE__;
405 }
406
407 #define CMD_PORT_CTX ((void*) 0x77)
408 #define TS1_PORT_CTX ((void*) 0x11)
409 #define TS2_PORT_CTX ((void*) 0x12)
410
411 typedef enum {
412 ADD_PORT,
413 QUIT
414 } action_t;
415
416 typedef struct {
417 action_t what;
418 port_t port;
419 } watcher_cmd;
420
send_watcher_cmd(port_t cmd_port,action_t action,port_t port)421 status_t send_watcher_cmd(port_t cmd_port, action_t action, port_t port)
422 {
423 watcher_cmd cmd = {action, port};
424 return port_write(cmd_port, ((port_packet_t *) &cmd), 1);;
425 }
426
group_watcher_thread(void * arg)427 static int group_watcher_thread(void *arg)
428 {
429 port_t watched[8] = {0};
430 status_t st = port_open("grp_ctrl", CMD_PORT_CTX, &watched[0]);
431 if (st < 0) {
432 printf("could not open port, status = %d\n", st);
433 return __LINE__;
434 }
435
436 size_t count = 1;
437 port_t group;
438 int ctx_count = -1;
439
440 while (true) {
441 st = port_group(watched, count, &group);
442 if (st < 0) {
443 printf("could not make group, status = %d\n", st);
444 return __LINE__;
445 }
446
447 port_result_t pr;
448 while (true) {
449 st = port_read(group, INFINITE_TIME, &pr);
450 if (st < 0) {
451 printf("could not read port, status = %d\n", st);
452 return __LINE__;
453 }
454
455 if (pr.ctx == CMD_PORT_CTX) {
456 break;
457 } else if (pr.ctx == TS1_PORT_CTX) {
458 ctx_count += 1;
459 } else if (pr.ctx == TS2_PORT_CTX) {
460 ctx_count += 2;
461 } else {
462 printf("unknown context %p\n", pr.ctx);
463 return __LINE__;
464 }
465 }
466
467 // Either adding a port or exiting; either way close the
468 // existing group port and create a new one if needed
469 // at the top of the loop.
470
471 port_close(group);
472 watcher_cmd *wc = (watcher_cmd *) &pr.packet;
473
474 if (wc->what == ADD_PORT) {
475 watched[count++] = wc->port;
476 } else if (wc->what == QUIT) {
477 break;
478 } else {
479 printf("unknown command %d\n", wc->what);
480 return __LINE__;
481 }
482 }
483
484 if (ctx_count != 2) {
485 printf("unexpected context count %d", ctx_count);
486 return __LINE__;
487 }
488
489 printf("group watcher shutdown\n");
490
491 for (size_t ix = 0; ix != count; ++ix) {
492 st = port_close(watched[ix]);
493 if (st < 0) {
494 printf("failed to close read port, status = %d\n", st);
495 return __LINE__;
496 }
497 }
498
499 return 0;
500 }
501
make_port_pair(const char * name,void * ctx,port_t * write,port_t * read)502 static status_t make_port_pair(const char *name, void *ctx, port_t *write, port_t *read)
503 {
504 status_t st = port_create(name, PORT_MODE_UNICAST, write);
505 if (st < 0)
506 return st;
507 return port_open(name,ctx, read);
508 }
509
group_basic(void)510 int group_basic(void)
511 {
512 // we spin a thread that connects to a well known port, then we
513 // send two ports that it will add to a group port.
514 port_t cmd_port;
515 status_t st = port_create("grp_ctrl", PORT_MODE_UNICAST, &cmd_port);
516 if (st < 0 ) {
517 printf("could not create port, status = %d\n", st);
518 return __LINE__;
519 }
520
521 thread_t *wt = thread_create(
522 "g_watcher", &group_watcher_thread, NULL, DEFAULT_PRIORITY, DEFAULT_STACK_SIZE);
523 thread_resume(wt);
524
525 port_t w_test_port1, r_test_port1;
526 st = make_port_pair("tst_port1", TS1_PORT_CTX, &w_test_port1, &r_test_port1);
527 if (st < 0)
528 return __LINE__;
529
530 port_t w_test_port2, r_test_port2;
531 st = make_port_pair("tst_port2", TS2_PORT_CTX, &w_test_port2, &r_test_port2);
532 if (st < 0)
533 return __LINE__;
534
535 st = send_watcher_cmd(cmd_port, ADD_PORT, r_test_port1);
536 if (st < 0)
537 return __LINE__;
538
539 st = send_watcher_cmd(cmd_port, ADD_PORT, r_test_port2);
540 if (st < 0)
541 return __LINE__;
542
543 thread_sleep(50);
544
545 port_packet_t pp = {{0}};
546 st = port_write(w_test_port1, &pp, 1);
547 if (st < 0)
548 return __LINE__;
549
550 st = port_write(w_test_port2, &pp, 1);
551 if (st < 0)
552 return __LINE__;
553
554 st = send_watcher_cmd(cmd_port, QUIT, 0);
555 if (st < 0)
556 return __LINE__;
557
558 int retcode = -1;
559 thread_join(wt, &retcode, INFINITE_TIME);
560 if (retcode) {
561 printf("child thread exited with %d\n", retcode);
562 return __LINE__;
563 }
564
565 st = port_close(w_test_port1);
566 if (st < 0)
567 return __LINE__;
568 st = port_close(w_test_port2);
569 if (st < 0)
570 return __LINE__;
571 st = port_close(cmd_port);
572 if (st < 0)
573 return __LINE__;
574 st = port_destroy(w_test_port1);
575 if (st < 0)
576 return __LINE__;
577 st = port_destroy(w_test_port2);
578 if (st < 0)
579 return __LINE__;
580 st = port_destroy(cmd_port);
581 if (st < 0)
582 return __LINE__;
583
584 return 0;
585 }
586
group_dynamic(void)587 int group_dynamic(void)
588 {
589 status_t st;
590
591 port_t w_test_port1, r_test_port1;
592 st = make_port_pair("tst_port1", TS1_PORT_CTX, &w_test_port1, &r_test_port1);
593 if (st < 0)
594 return __LINE__;
595
596 port_t w_test_port2, r_test_port2;
597 st = make_port_pair("tst_port2", TS2_PORT_CTX, &w_test_port2, &r_test_port2);
598 if (st < 0)
599 return __LINE__;
600
601 port_t pg;
602 st = port_group(&r_test_port1, 1, &pg);
603 if (st < 0)
604 return __LINE__;
605
606 port_packet_t pkt = { { 0 } };
607 st = port_write(w_test_port2, &pkt, 1);
608 if (st < 0)
609 return __LINE__;
610
611 port_result_t rslt;
612 st = port_read(pg, 0, &rslt);
613 if (st != ERR_TIMED_OUT)
614 return __LINE__;
615
616 // Attach the port that has been written to to the port group and ensure
617 // that we can read from it.
618 st = port_group_add(pg, r_test_port2);
619 if (st < 0)
620 return __LINE__;
621
622 st = port_read(pg, 0, &rslt);
623 if (st < 0)
624 return __LINE__;
625
626 // Write some data to a port then remove it from the port group and ensure
627 // that we can't read from it.
628 st = port_write(w_test_port1, &pkt, 1);
629 if (st < 0)
630 return __LINE__;
631
632 st = port_group_remove(pg, r_test_port1);
633 if (st < 0)
634 return __LINE__;
635
636 st = port_read(pg, 0, &rslt);
637 if (st != ERR_TIMED_OUT)
638 return __LINE__;
639
640 st = port_close(w_test_port1);
641 if (st < 0)
642 return __LINE__;
643 st = port_close(w_test_port2);
644 if (st < 0)
645 return __LINE__;
646 st = port_destroy(w_test_port1);
647 if (st < 0)
648 return __LINE__;
649 st = port_destroy(w_test_port2);
650 if (st < 0)
651 return __LINE__;
652
653 return 0;
654 }
655
656 event_t group_waiting_sync_evt;
657
receive_thread(void * arg)658 static int receive_thread(void *arg)
659 {
660 port_t pg = (port_t)arg;
661
662 // Try to read from an empty port group. When the other thread adds a port
663 // to this port group, we should wake up and
664 port_result_t rslt;
665 status_t st = port_read(pg, 500, &rslt);
666 if (st == ERR_TIMED_OUT)
667 return __LINE__;
668
669 event_signal(&group_waiting_sync_evt, true);
670
671 return 0;
672 }
673
674 /* Test the edge case where a read port with data available is added to a port
675 * group that has a read-blocked receiver.
676 */
group_waiting(void)677 int group_waiting(void)
678 {
679 status_t st;
680
681 event_init(&group_waiting_sync_evt, false, EVENT_FLAG_AUTOUNSIGNAL);
682
683 port_t w_test_port1, r_test_port1;
684 st = make_port_pair("tst_port1", TS1_PORT_CTX, &w_test_port1, &r_test_port1);
685 if (st < 0)
686 return __LINE__;
687
688 // Write something to this port group that currently has no receivers.
689 port_packet_t pkt = { { 0 } };
690 st = port_write(w_test_port1, &pkt, 1);
691 if (st < 0)
692 return __LINE__;
693
694 // Create an empty port group.
695 port_t pg;
696 st = port_group(NULL, 0, &pg);
697 if (st < 0)
698 return __LINE__;
699
700
701 thread_t *t1 = thread_create(
702 "receiver",
703 &receive_thread,
704 (void *)pg,
705 DEFAULT_PRIORITY,
706 DEFAULT_STACK_SIZE
707 );
708
709 thread_resume(t1);
710
711 // Wait for the other thread to block on the read.
712 thread_sleep(20);
713
714 // Adding a port that has data available to the port group should wake any
715 // threads waiting on that port group.
716 port_group_add(pg, r_test_port1);
717
718 if (event_wait_timeout(&group_waiting_sync_evt, 500) != NO_ERROR)
719 return __LINE__;
720
721 st = port_close(w_test_port1);
722 if (st < 0)
723 return __LINE__;
724
725 st = port_destroy(w_test_port1);
726 if (st < 0)
727 return __LINE__;
728
729 return 0;
730 }
731
732 #define RUN_TEST(t) result = t(); if (result) goto fail
733
port_tests(void)734 int port_tests(void)
735 {
736 int result;
737 int count = 3;
738 while (count--) {
739 RUN_TEST(single_thread_basic);
740 RUN_TEST(two_threads_basic);
741 RUN_TEST(group_basic);
742 RUN_TEST(group_dynamic);
743 }
744
745 printf("all tests passed\n");
746 return 0;
747 fail:
748 printf("test failed at line %d\n", result);
749 return 1;
750 }
751
752 #undef RUN_TEST
753