1 /*
2 * Copyright (c) 2015 Carlos Pizano-Uribe [email protected]
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining
5 * a copy of this software and associated documentation files
6 * (the "Software"), to deal in the Software without restriction,
7 * including without limitation the rights to use, copy, modify, merge,
8 * publish, distribute, sublicense, and/or sell copies of the Software,
9 * and to permit persons to whom the Software is furnished to do so,
10 * subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice shall be
13 * included in all copies or substantial portions of the Software.
14 *
15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
18 * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
19 * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
20 * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
21 * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
22 */
23
24 /**
25 * @file
26 * @brief Port object functions
27 * @defgroup event Events
28 *
29 */
30
31 #include <debug.h>
32 #include <list.h>
33 #include <malloc.h>
34 #include <string.h>
35 #include <pow2.h>
36 #include <err.h>
37 #include <kernel/thread.h>
38 #include <kernel/port.h>
39
40 // write ports can be in two states, open and closed, which have a
41 // different magic number.
42
43 #define WRITEPORT_MAGIC_W (0x70727477) // 'prtw'
44 #define WRITEPORT_MAGIC_X (0x70727478) // 'prtx'
45
46 #define READPORT_MAGIC (0x70727472) // 'prtr'
47 #define PORTGROUP_MAGIC (0x70727467) // 'prtg'
48
49 #define PORT_BUFF_SIZE 8
50 #define PORT_BUFF_SIZE_BIG 64
51
52 #define RESCHEDULE_POLICY 1
53
54 #define MAX_PORT_GROUP_COUNT 256
55
56 typedef struct {
57 uint log2;
58 uint avail;
59 uint head;
60 uint tail;
61 port_packet_t packet[1];
62 } port_buf_t;
63
64 typedef struct {
65 int magic;
66 struct list_node node;
67 port_buf_t *buf;
68 struct list_node rp_list;
69 port_mode_t mode;
70 char name[PORT_NAME_LEN];
71 } write_port_t;
72
73 typedef struct {
74 int magic;
75 wait_queue_t wait;
76 struct list_node rp_list;
77 } port_group_t;
78
79 typedef struct {
80 int magic;
81 struct list_node w_node;
82 struct list_node g_node;
83 port_buf_t *buf;
84 void *ctx;
85 wait_queue_t wait;
86 write_port_t *wport;
87 port_group_t *gport;
88 } read_port_t;
89
90
91 static struct list_node write_port_list;
92
93
make_buf(uint pk_count)94 static port_buf_t *make_buf(uint pk_count)
95 {
96 uint size = sizeof(port_buf_t) + ((pk_count - 1) * sizeof(port_packet_t));
97 port_buf_t *buf = (port_buf_t *) malloc(size);
98 if (!buf)
99 return NULL;
100 buf->log2 = log2_uint(pk_count);
101 buf->head = buf->tail = 0;
102 buf->avail = pk_count;
103 return buf;
104 }
105
buf_is_empty(port_buf_t * buf)106 static inline bool buf_is_empty(port_buf_t *buf)
107 {
108 return buf->avail == valpow2(buf->log2);
109 }
110
buf_write(port_buf_t * buf,const port_packet_t * packets,size_t count)111 static status_t buf_write(port_buf_t *buf, const port_packet_t *packets, size_t count)
112 {
113 if (buf->avail < count)
114 return ERR_NOT_ENOUGH_BUFFER;
115
116 for (size_t ix = 0; ix != count; ix++) {
117 buf->packet[buf->tail] = packets[ix];
118 buf->tail = modpow2(++buf->tail, buf->log2);
119 }
120 buf->avail -= count;
121 return NO_ERROR;
122 }
123
buf_read(port_buf_t * buf,port_result_t * pr)124 static status_t buf_read(port_buf_t *buf, port_result_t *pr)
125 {
126 if (buf_is_empty(buf))
127 return ERR_NO_MSG;
128 pr->packet = buf->packet[buf->head];
129 buf->head = modpow2(++buf->head, buf->log2);
130 ++buf->avail;
131 return NO_ERROR;
132 }
133
134 // must be called before any use of ports.
port_init(void)135 void port_init(void)
136 {
137 list_initialize(&write_port_list);
138 }
139
port_create(const char * name,port_mode_t mode,port_t * port)140 status_t port_create(const char *name, port_mode_t mode, port_t *port)
141 {
142 if (!name || !port)
143 return ERR_INVALID_ARGS;
144
145 // only unicast ports can have a large buffer.
146 if (mode & PORT_MODE_BROADCAST) {
147 if (mode & PORT_MODE_BIG_BUFFER)
148 return ERR_INVALID_ARGS;
149 }
150
151 if (strlen(name) >= PORT_NAME_LEN)
152 return ERR_INVALID_ARGS;
153
154 // lookup for existing port, return that if found.
155 write_port_t *wp = NULL;
156 THREAD_LOCK(state1);
157 list_for_every_entry(&write_port_list, wp, write_port_t, node) {
158 if (strcmp(wp->name, name) == 0) {
159 // can't return closed ports.
160 if (wp->magic == WRITEPORT_MAGIC_X)
161 wp = NULL;
162 THREAD_UNLOCK(state1);
163 if (wp) {
164 *port = (void *) wp;
165 return ERR_ALREADY_EXISTS;
166 } else {
167 return ERR_BUSY;
168 }
169 }
170 }
171 THREAD_UNLOCK(state1);
172
173 // not found, create the write port and the circular buffer.
174 wp = calloc(1, sizeof(write_port_t));
175 if (!wp)
176 return ERR_NO_MEMORY;
177
178 wp->magic = WRITEPORT_MAGIC_W;
179 wp->mode = mode;
180 strlcpy(wp->name, name, sizeof(wp->name));
181 list_initialize(&wp->rp_list);
182
183 uint size = (mode & PORT_MODE_BIG_BUFFER) ? PORT_BUFF_SIZE_BIG : PORT_BUFF_SIZE;
184 wp->buf = make_buf(size);
185 if (!wp->buf) {
186 free(wp);
187 return ERR_NO_MEMORY;
188 }
189
190 // todo: race condtion! a port with the same name could have been created
191 // by another thread at is point.
192 THREAD_LOCK(state2);
193 list_add_tail(&write_port_list, &wp->node);
194 THREAD_UNLOCK(state2);
195
196 *port = (void *)wp;
197 return NO_ERROR;
198 }
199
port_open(const char * name,void * ctx,port_t * port)200 status_t port_open(const char *name, void *ctx, port_t *port)
201 {
202 if (!name || !port)
203 return ERR_INVALID_ARGS;
204
205 // assume success; create the read port and buffer now.
206 read_port_t *rp = calloc(1, sizeof(read_port_t));
207 if (!rp)
208 return ERR_NO_MEMORY;
209
210 rp->magic = READPORT_MAGIC;
211 wait_queue_init(&rp->wait);
212 rp->ctx = ctx;
213
214 // |buf| might not be needed, but we always allocate outside the lock.
215 // this buffer is only needed for broadcast ports, but we don't know
216 // that here.
217 port_buf_t *buf = make_buf(PORT_BUFF_SIZE);
218 if (!buf) {
219 free(rp);
220 return ERR_NO_MEMORY;
221 }
222
223 // find the named write port and associate it with read port.
224 status_t rc = ERR_NOT_FOUND;
225
226 THREAD_LOCK(state);
227 write_port_t *wp = NULL;
228 list_for_every_entry(&write_port_list, wp, write_port_t, node) {
229 if (strcmp(wp->name, name) == 0) {
230 // found; add read port to write port list.
231 rp->wport = wp;
232 if (wp->buf) {
233 // this is the first read port; transfer the circular buffer.
234 list_add_tail(&wp->rp_list, &rp->w_node);
235 rp->buf = wp->buf;
236 wp->buf = NULL;
237 rc = NO_ERROR;
238 } else if (buf) {
239 // not first read port.
240 if (wp->mode & PORT_MODE_UNICAST) {
241 // cannot add a second listener.
242 rc = ERR_NOT_ALLOWED;
243 break;
244 }
245 // use the new (small) circular buffer.
246 list_add_tail(&wp->rp_list, &rp->w_node);
247 rp->buf = buf;
248 buf = NULL;
249 rc = NO_ERROR;
250 } else {
251 // |buf| allocation failed and the buffer was needed.
252 rc = ERR_NO_MEMORY;
253 }
254 break;
255 }
256 }
257 THREAD_UNLOCK(state);
258
259 if (buf)
260 free(buf);
261
262 if (rc == NO_ERROR) {
263 *port = (void *)rp;
264 } else {
265 free(rp);
266 }
267 return rc;
268 }
269
port_group(port_t * ports,size_t count,port_t * group)270 status_t port_group(port_t *ports, size_t count, port_t *group)
271 {
272 if (count > MAX_PORT_GROUP_COUNT)
273 return ERR_TOO_BIG;
274
275 // Allow empty port groups.
276 if (count && !ports)
277 return ERR_INVALID_ARGS;
278
279 if (!group)
280 return ERR_INVALID_ARGS;
281
282 // assume success; create port group now.
283 port_group_t *pg = calloc(1, sizeof(port_group_t));
284 if (!pg)
285 return ERR_NO_MEMORY;
286
287 pg->magic = PORTGROUP_MAGIC;
288 wait_queue_init(&pg->wait);
289 list_initialize(&pg->rp_list);
290
291 status_t rc = NO_ERROR;
292
293 THREAD_LOCK(state);
294 for (size_t ix = 0; ix != count; ix++) {
295 read_port_t *rp = (read_port_t *)ports[ix];
296 if ((rp->magic != READPORT_MAGIC) || rp->gport) {
297 // wrong type of port, or port already part of a group,
298 // in any case, undo the changes to the previous read ports.
299 for (size_t jx = 0; jx != ix; jx++) {
300 ((read_port_t *)ports[jx])->gport = NULL;
301 }
302 rc = ERR_BAD_HANDLE;
303 break;
304 }
305 // link port group and read port.
306 rp->gport = pg;
307 list_add_tail(&pg->rp_list, &rp->g_node);
308 }
309 THREAD_UNLOCK(state);
310
311 if (rc == NO_ERROR) {
312 *group = (port_t *)pg;
313 } else {
314 free(pg);
315 }
316 return rc;
317 }
318
port_group_add(port_t group,port_t port)319 status_t port_group_add(port_t group, port_t port)
320 {
321 if (!port || !group)
322 return ERR_INVALID_ARGS;
323
324 // Make sure the user has actually passed in a port group and a read-port.
325 port_group_t *pg = (port_group_t *)group;
326 if (pg->magic != PORTGROUP_MAGIC)
327 return ERR_INVALID_ARGS;
328
329 read_port_t *rp = (read_port_t *)port;
330 if (rp->magic != READPORT_MAGIC || rp->gport)
331 return ERR_BAD_HANDLE;
332
333 status_t rc = NO_ERROR;
334 THREAD_LOCK(state);
335
336 if (list_length(&pg->rp_list) == MAX_PORT_GROUP_COUNT) {
337 rc = ERR_TOO_BIG;
338 } else {
339 rp->gport = pg;
340 list_add_tail(&pg->rp_list, &rp->g_node);
341
342 // If the new read port being added has messages available, try to wake
343 // any readers that might be present.
344 if (!buf_is_empty(rp->buf)) {
345 wait_queue_wake_one(&pg->wait, false, NO_ERROR);
346 }
347 }
348
349 THREAD_UNLOCK(state);
350
351 return rc;
352 }
353
port_group_remove(port_t group,port_t port)354 status_t port_group_remove(port_t group, port_t port)
355 {
356 if (!port || !group)
357 return ERR_INVALID_ARGS;
358
359 // Make sure the user has actually passed in a port group and a read-port.
360 port_group_t *pg = (port_group_t *)group;
361 if (pg->magic != PORTGROUP_MAGIC)
362 return ERR_INVALID_ARGS;
363
364 read_port_t *rp = (read_port_t *)port;
365 if (rp->magic != READPORT_MAGIC || rp->gport != pg)
366 return ERR_BAD_HANDLE;
367
368 THREAD_LOCK(state);
369
370 bool found = false;
371 read_port_t *current_rp;
372 list_for_every_entry(&pg->rp_list, current_rp, read_port_t, g_node) {
373 if (current_rp == rp) {
374 found = true;
375 }
376 }
377
378 if (!found)
379 return ERR_BAD_HANDLE;
380
381 list_delete(&rp->g_node);
382
383 THREAD_UNLOCK(state);
384
385 return NO_ERROR;
386 }
387
port_write(port_t port,const port_packet_t * pk,size_t count)388 status_t port_write(port_t port, const port_packet_t *pk, size_t count)
389 {
390 if (!port || !pk)
391 return ERR_INVALID_ARGS;
392
393 write_port_t *wp = (write_port_t *)port;
394 THREAD_LOCK(state);
395 if (wp->magic != WRITEPORT_MAGIC_W) {
396 // wrong port type.
397 THREAD_UNLOCK(state);
398 return ERR_BAD_HANDLE;
399 }
400
401 status_t status = NO_ERROR;
402 int awake_count = 0;
403
404 if (wp->buf) {
405 // there are no read ports, just write to the buffer.
406 status = buf_write(wp->buf, pk, count);
407 } else {
408 // there are read ports. for each, write and attempt to wake a thread
409 // from the port group or from the read port itself.
410 read_port_t *rp;
411 list_for_every_entry(&wp->rp_list, rp, read_port_t, w_node) {
412 if (buf_write(rp->buf, pk, count) < 0) {
413 // buffer full.
414 status = ERR_PARTIAL_WRITE;
415 continue;
416 }
417
418 int awaken = 0;
419 if (rp->gport) {
420 awaken = wait_queue_wake_one(&rp->gport->wait, false, NO_ERROR);
421 }
422 if (!awaken) {
423 awaken = wait_queue_wake_one(&rp->wait, false, NO_ERROR);
424 }
425
426 awake_count += awaken;
427 }
428 }
429
430 THREAD_UNLOCK(state);
431
432 #if RESCHEDULE_POLICY
433 if (awake_count)
434 thread_yield();
435 #endif
436
437 return status;
438 }
439
read_no_lock(read_port_t * rp,lk_time_t timeout,port_result_t * result)440 static inline status_t read_no_lock(read_port_t *rp, lk_time_t timeout, port_result_t *result)
441 {
442 status_t status = buf_read(rp->buf, result);
443 result->ctx = rp->ctx;
444
445 if (status != ERR_NO_MSG)
446 return status;
447
448 // early return allows compiler to elide the rest for the group read case.
449 if (!timeout)
450 return ERR_TIMED_OUT;
451
452 status_t wr = wait_queue_block(&rp->wait, timeout);
453 if (wr != NO_ERROR)
454 return wr;
455 // recursive tail call is usually optimized away with a goto.
456 return read_no_lock(rp, timeout, result);
457 }
458
port_read(port_t port,lk_time_t timeout,port_result_t * result)459 status_t port_read(port_t port, lk_time_t timeout, port_result_t *result)
460 {
461 if (!port || !result)
462 return ERR_INVALID_ARGS;
463
464 status_t rc = ERR_GENERIC;
465 read_port_t *rp = (read_port_t *)port;
466
467 THREAD_LOCK(state);
468 if (rp->magic == READPORT_MAGIC) {
469 // dealing with a single port.
470 rc = read_no_lock(rp, timeout, result);
471 } else if (rp->magic == PORTGROUP_MAGIC) {
472 // dealing with a port group.
473 port_group_t *pg = (port_group_t *)port;
474 do {
475 // read each port with no timeout.
476 // todo: this order is fixed, probably a bad thing.
477 list_for_every_entry(&pg->rp_list, rp, read_port_t, g_node) {
478 rc = read_no_lock(rp, 0, result);
479 if (rc != ERR_TIMED_OUT)
480 goto read_exit;
481 }
482 // no data, block on the group waitqueue.
483 rc = wait_queue_block(&pg->wait, timeout);
484 } while (rc == NO_ERROR);
485 } else {
486 // wrong port type.
487 rc = ERR_BAD_HANDLE;
488 }
489
490 read_exit:
491 THREAD_UNLOCK(state);
492 return rc;
493 }
494
port_destroy(port_t port)495 status_t port_destroy(port_t port)
496 {
497 if (!port)
498 return ERR_INVALID_ARGS;
499
500 write_port_t *wp = (write_port_t *) port;
501 port_buf_t *buf = NULL;
502
503 THREAD_LOCK(state);
504 if (wp->magic != WRITEPORT_MAGIC_X) {
505 // wrong port type.
506 THREAD_UNLOCK(state);
507 return ERR_BAD_HANDLE;
508 }
509 // remove self from global named ports list.
510 list_delete(&wp->node);
511
512 if (wp->buf) {
513 // we have no readers.
514 buf = wp->buf;
515 } else {
516 // for each reader:
517 read_port_t *rp;
518 list_for_every_entry(&wp->rp_list, rp, read_port_t, w_node) {
519 // wake the read and group ports.
520 wait_queue_wake_all(&rp->wait, false, ERR_CANCELLED);
521 if (rp->gport) {
522 wait_queue_wake_all(&rp->gport->wait, false, ERR_CANCELLED);
523 }
524 // remove self from reader ports.
525 rp->wport = NULL;
526 }
527 }
528
529 wp->magic = 0;
530 THREAD_UNLOCK(state);
531
532 free(buf);
533 free(wp);
534 return NO_ERROR;
535 }
536
port_close(port_t port)537 status_t port_close(port_t port)
538 {
539 if (!port)
540 return ERR_INVALID_ARGS;
541
542 read_port_t *rp = (read_port_t *) port;
543 port_buf_t *buf = NULL;
544
545 THREAD_LOCK(state);
546 if (rp->magic == READPORT_MAGIC) {
547 // dealing with a read port.
548 if (rp->wport) {
549 // remove self from write port list and reassign the bufer if last.
550 list_delete(&rp->w_node);
551 if (list_is_empty(&rp->wport->rp_list)) {
552 rp->wport->buf = rp->buf;
553 rp->buf = NULL;
554 } else {
555 buf = rp->buf;
556 }
557 }
558 if (rp->gport) {
559 // remove self from port group list.
560 list_delete(&rp->g_node);
561 }
562 // wake up waiters, the return code is ERR_OBJECT_DESTROYED.
563 wait_queue_destroy(&rp->wait, true);
564 rp->magic = 0;
565
566 } else if (rp->magic == PORTGROUP_MAGIC) {
567 // dealing with a port group.
568 port_group_t *pg = (port_group_t *) port;
569 // wake up waiters.
570 wait_queue_destroy(&pg->wait, true);
571 // remove self from reader ports.
572 rp = NULL;
573 list_for_every_entry(&pg->rp_list, rp, read_port_t, g_node) {
574 rp->gport = NULL;
575 }
576 pg->magic = 0;
577
578 } else if (rp->magic == WRITEPORT_MAGIC_W) {
579 // dealing with a write port.
580 write_port_t *wp = (write_port_t *) port;
581 // mark it as closed. Now it can be read but not written to.
582 wp->magic = WRITEPORT_MAGIC_X;
583 THREAD_UNLOCK(state);
584 return NO_ERROR;
585
586 } else {
587 THREAD_UNLOCK(state);
588 return ERR_BAD_HANDLE;
589 }
590
591 THREAD_UNLOCK(state);
592
593 free(buf);
594 free(port);
595 return NO_ERROR;
596 }
597
598