1 /*
2  * Copyright (c) 2015 Carlos Pizano-Uribe  [email protected]
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining
5  * a copy of this software and associated documentation files
6  * (the "Software"), to deal in the Software without restriction,
7  * including without limitation the rights to use, copy, modify, merge,
8  * publish, distribute, sublicense, and/or sell copies of the Software,
9  * and to permit persons to whom the Software is furnished to do so,
10  * subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice shall be
13  * included in all copies or substantial portions of the Software.
14  *
15  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
16  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
17  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
18  * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
19  * CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
20  * TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
21  * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
22  */
23 
24 /**
25  * @file
26  * @brief  Port object functions
27  * @defgroup event Events
28  *
29  */
30 
31 #include <debug.h>
32 #include <list.h>
33 #include <malloc.h>
34 #include <string.h>
35 #include <pow2.h>
36 #include <err.h>
37 #include <kernel/thread.h>
38 #include <kernel/port.h>
39 
40 // write ports can be in two states, open and closed, which have a
41 // different magic number.
42 
43 #define WRITEPORT_MAGIC_W (0x70727477) // 'prtw'
44 #define WRITEPORT_MAGIC_X (0x70727478) // 'prtx'
45 
46 #define READPORT_MAGIC  (0x70727472)  // 'prtr'
47 #define PORTGROUP_MAGIC (0x70727467)  // 'prtg'
48 
49 #define PORT_BUFF_SIZE      8
50 #define PORT_BUFF_SIZE_BIG 64
51 
52 #define RESCHEDULE_POLICY 1
53 
54 #define MAX_PORT_GROUP_COUNT 256
55 
56 typedef struct {
57     uint log2;
58     uint avail;
59     uint head;
60     uint tail;
61     port_packet_t packet[1];
62 } port_buf_t;
63 
64 typedef struct {
65     int magic;
66     struct list_node node;
67     port_buf_t *buf;
68     struct list_node rp_list;
69     port_mode_t mode;
70     char name[PORT_NAME_LEN];
71 } write_port_t;
72 
73 typedef struct {
74     int magic;
75     wait_queue_t wait;
76     struct list_node rp_list;
77 } port_group_t;
78 
79 typedef struct {
80     int magic;
81     struct list_node w_node;
82     struct list_node g_node;
83     port_buf_t *buf;
84     void *ctx;
85     wait_queue_t wait;
86     write_port_t *wport;
87     port_group_t *gport;
88 } read_port_t;
89 
90 
91 static struct list_node write_port_list;
92 
93 
make_buf(uint pk_count)94 static port_buf_t *make_buf(uint pk_count)
95 {
96     uint size = sizeof(port_buf_t) + ((pk_count - 1) * sizeof(port_packet_t));
97     port_buf_t *buf = (port_buf_t *) malloc(size);
98     if (!buf)
99         return NULL;
100     buf->log2 = log2_uint(pk_count);
101     buf->head = buf->tail = 0;
102     buf->avail = pk_count;
103     return buf;
104 }
105 
buf_is_empty(port_buf_t * buf)106 static inline bool buf_is_empty(port_buf_t *buf)
107 {
108     return buf->avail == valpow2(buf->log2);
109 }
110 
buf_write(port_buf_t * buf,const port_packet_t * packets,size_t count)111 static status_t buf_write(port_buf_t *buf, const port_packet_t *packets, size_t count)
112 {
113     if (buf->avail < count)
114         return ERR_NOT_ENOUGH_BUFFER;
115 
116     for (size_t ix = 0; ix != count; ix++) {
117         buf->packet[buf->tail] = packets[ix];
118         buf->tail = modpow2(++buf->tail, buf->log2);
119     }
120     buf->avail -= count;
121     return NO_ERROR;
122 }
123 
buf_read(port_buf_t * buf,port_result_t * pr)124 static status_t buf_read(port_buf_t *buf, port_result_t *pr)
125 {
126     if (buf_is_empty(buf))
127         return ERR_NO_MSG;
128     pr->packet = buf->packet[buf->head];
129     buf->head = modpow2(++buf->head, buf->log2);
130     ++buf->avail;
131     return NO_ERROR;
132 }
133 
134 // must be called before any use of ports.
port_init(void)135 void port_init(void)
136 {
137     list_initialize(&write_port_list);
138 }
139 
port_create(const char * name,port_mode_t mode,port_t * port)140 status_t port_create(const char *name, port_mode_t mode, port_t *port)
141 {
142     if (!name || !port)
143         return ERR_INVALID_ARGS;
144 
145     // only unicast ports can have a large buffer.
146     if (mode & PORT_MODE_BROADCAST) {
147         if (mode & PORT_MODE_BIG_BUFFER)
148             return ERR_INVALID_ARGS;
149     }
150 
151     if (strlen(name) >= PORT_NAME_LEN)
152         return ERR_INVALID_ARGS;
153 
154     // lookup for existing port, return that if found.
155     write_port_t *wp = NULL;
156     THREAD_LOCK(state1);
157     list_for_every_entry(&write_port_list, wp, write_port_t, node) {
158         if (strcmp(wp->name, name) == 0) {
159             // can't return closed ports.
160             if (wp->magic == WRITEPORT_MAGIC_X)
161                 wp = NULL;
162             THREAD_UNLOCK(state1);
163             if (wp) {
164                 *port = (void *) wp;
165                 return ERR_ALREADY_EXISTS;
166             } else {
167                 return ERR_BUSY;
168             }
169         }
170     }
171     THREAD_UNLOCK(state1);
172 
173     // not found, create the write port and the circular buffer.
174     wp = calloc(1, sizeof(write_port_t));
175     if (!wp)
176         return ERR_NO_MEMORY;
177 
178     wp->magic = WRITEPORT_MAGIC_W;
179     wp->mode = mode;
180     strlcpy(wp->name, name, sizeof(wp->name));
181     list_initialize(&wp->rp_list);
182 
183     uint size = (mode & PORT_MODE_BIG_BUFFER) ?  PORT_BUFF_SIZE_BIG : PORT_BUFF_SIZE;
184     wp->buf = make_buf(size);
185     if (!wp->buf) {
186         free(wp);
187         return ERR_NO_MEMORY;
188     }
189 
190     // todo: race condtion! a port with the same name could have been created
191     // by another thread at is point.
192     THREAD_LOCK(state2);
193     list_add_tail(&write_port_list, &wp->node);
194     THREAD_UNLOCK(state2);
195 
196     *port = (void *)wp;
197     return NO_ERROR;
198 }
199 
port_open(const char * name,void * ctx,port_t * port)200 status_t port_open(const char *name, void *ctx, port_t *port)
201 {
202     if (!name || !port)
203         return ERR_INVALID_ARGS;
204 
205     // assume success; create the read port and buffer now.
206     read_port_t *rp = calloc(1, sizeof(read_port_t));
207     if (!rp)
208         return ERR_NO_MEMORY;
209 
210     rp->magic = READPORT_MAGIC;
211     wait_queue_init(&rp->wait);
212     rp->ctx = ctx;
213 
214     // |buf| might not be needed, but we always allocate outside the lock.
215     // this buffer is only needed for broadcast ports, but we don't know
216     // that here.
217     port_buf_t *buf = make_buf(PORT_BUFF_SIZE);
218     if (!buf) {
219         free(rp);
220         return ERR_NO_MEMORY;
221     }
222 
223     // find the named write port and associate it with read port.
224     status_t rc = ERR_NOT_FOUND;
225 
226     THREAD_LOCK(state);
227     write_port_t *wp = NULL;
228     list_for_every_entry(&write_port_list, wp, write_port_t, node) {
229         if (strcmp(wp->name, name) == 0) {
230             // found; add read port to write port list.
231             rp->wport = wp;
232             if (wp->buf) {
233                 // this is the first read port; transfer the circular buffer.
234                 list_add_tail(&wp->rp_list, &rp->w_node);
235                 rp->buf = wp->buf;
236                 wp->buf = NULL;
237                 rc = NO_ERROR;
238             } else if (buf) {
239                 // not first read port.
240                 if (wp->mode & PORT_MODE_UNICAST) {
241                     // cannot add a second listener.
242                     rc = ERR_NOT_ALLOWED;
243                     break;
244                 }
245                 // use the new (small) circular buffer.
246                 list_add_tail(&wp->rp_list, &rp->w_node);
247                 rp->buf = buf;
248                 buf = NULL;
249                 rc = NO_ERROR;
250             } else {
251                 // |buf| allocation failed and the buffer was needed.
252                 rc = ERR_NO_MEMORY;
253             }
254             break;
255         }
256     }
257     THREAD_UNLOCK(state);
258 
259     if (buf)
260         free(buf);
261 
262     if (rc == NO_ERROR) {
263         *port = (void *)rp;
264     } else {
265         free(rp);
266     }
267     return rc;
268 }
269 
port_group(port_t * ports,size_t count,port_t * group)270 status_t port_group(port_t *ports, size_t count, port_t *group)
271 {
272     if (count > MAX_PORT_GROUP_COUNT)
273         return ERR_TOO_BIG;
274 
275     // Allow empty port groups.
276     if (count && !ports)
277         return ERR_INVALID_ARGS;
278 
279     if (!group)
280         return ERR_INVALID_ARGS;
281 
282     // assume success; create port group now.
283     port_group_t *pg = calloc(1, sizeof(port_group_t));
284     if (!pg)
285         return ERR_NO_MEMORY;
286 
287     pg->magic = PORTGROUP_MAGIC;
288     wait_queue_init(&pg->wait);
289     list_initialize(&pg->rp_list);
290 
291     status_t rc = NO_ERROR;
292 
293     THREAD_LOCK(state);
294     for (size_t ix = 0; ix != count; ix++) {
295         read_port_t *rp = (read_port_t *)ports[ix];
296         if ((rp->magic != READPORT_MAGIC) || rp->gport) {
297             // wrong type of port, or port already part of a group,
298             // in any case, undo the changes to the previous read ports.
299             for (size_t jx = 0; jx != ix; jx++) {
300                 ((read_port_t *)ports[jx])->gport = NULL;
301             }
302             rc = ERR_BAD_HANDLE;
303             break;
304         }
305         // link port group and read port.
306         rp->gport = pg;
307         list_add_tail(&pg->rp_list, &rp->g_node);
308     }
309     THREAD_UNLOCK(state);
310 
311     if (rc == NO_ERROR) {
312         *group = (port_t *)pg;
313     } else {
314         free(pg);
315     }
316     return rc;
317 }
318 
port_group_add(port_t group,port_t port)319 status_t port_group_add(port_t group, port_t port)
320 {
321     if (!port || !group)
322         return ERR_INVALID_ARGS;
323 
324     // Make sure the user has actually passed in a port group and a read-port.
325     port_group_t *pg = (port_group_t *)group;
326     if (pg->magic != PORTGROUP_MAGIC)
327         return ERR_INVALID_ARGS;
328 
329     read_port_t *rp = (read_port_t *)port;
330     if (rp->magic != READPORT_MAGIC || rp->gport)
331         return ERR_BAD_HANDLE;
332 
333     status_t rc = NO_ERROR;
334     THREAD_LOCK(state);
335 
336     if (list_length(&pg->rp_list) == MAX_PORT_GROUP_COUNT) {
337         rc = ERR_TOO_BIG;
338     } else {
339         rp->gport = pg;
340         list_add_tail(&pg->rp_list, &rp->g_node);
341 
342         // If the new read port being added has messages available, try to wake
343         // any readers that might be present.
344         if (!buf_is_empty(rp->buf)) {
345             wait_queue_wake_one(&pg->wait, false, NO_ERROR);
346         }
347     }
348 
349     THREAD_UNLOCK(state);
350 
351     return rc;
352 }
353 
port_group_remove(port_t group,port_t port)354 status_t port_group_remove(port_t group, port_t port)
355 {
356     if (!port || !group)
357         return ERR_INVALID_ARGS;
358 
359     // Make sure the user has actually passed in a port group and a read-port.
360     port_group_t *pg = (port_group_t *)group;
361     if (pg->magic != PORTGROUP_MAGIC)
362         return ERR_INVALID_ARGS;
363 
364     read_port_t *rp = (read_port_t *)port;
365     if (rp->magic != READPORT_MAGIC || rp->gport != pg)
366         return ERR_BAD_HANDLE;
367 
368     THREAD_LOCK(state);
369 
370     bool found = false;
371     read_port_t *current_rp;
372     list_for_every_entry(&pg->rp_list, current_rp, read_port_t, g_node) {
373         if (current_rp == rp) {
374             found = true;
375         }
376     }
377 
378     if (!found)
379         return ERR_BAD_HANDLE;
380 
381     list_delete(&rp->g_node);
382 
383     THREAD_UNLOCK(state);
384 
385     return NO_ERROR;
386 }
387 
port_write(port_t port,const port_packet_t * pk,size_t count)388 status_t port_write(port_t port, const port_packet_t *pk, size_t count)
389 {
390     if (!port || !pk)
391         return ERR_INVALID_ARGS;
392 
393     write_port_t *wp = (write_port_t *)port;
394     THREAD_LOCK(state);
395     if (wp->magic != WRITEPORT_MAGIC_W) {
396         // wrong port type.
397         THREAD_UNLOCK(state);
398         return ERR_BAD_HANDLE;
399     }
400 
401     status_t status = NO_ERROR;
402     int awake_count = 0;
403 
404     if (wp->buf) {
405         // there are no read ports, just write to the buffer.
406         status = buf_write(wp->buf, pk, count);
407     } else {
408         // there are read ports. for each, write and attempt to wake a thread
409         // from the port group or from the read port itself.
410         read_port_t *rp;
411         list_for_every_entry(&wp->rp_list, rp, read_port_t, w_node) {
412             if (buf_write(rp->buf, pk, count) < 0) {
413                 // buffer full.
414                 status = ERR_PARTIAL_WRITE;
415                 continue;
416             }
417 
418             int awaken = 0;
419             if (rp->gport) {
420                 awaken = wait_queue_wake_one(&rp->gport->wait, false, NO_ERROR);
421             }
422             if (!awaken) {
423                 awaken = wait_queue_wake_one(&rp->wait, false, NO_ERROR);
424             }
425 
426             awake_count += awaken;
427         }
428     }
429 
430     THREAD_UNLOCK(state);
431 
432 #if RESCHEDULE_POLICY
433     if (awake_count)
434         thread_yield();
435 #endif
436 
437     return status;
438 }
439 
read_no_lock(read_port_t * rp,lk_time_t timeout,port_result_t * result)440 static inline status_t read_no_lock(read_port_t *rp, lk_time_t timeout, port_result_t *result)
441 {
442     status_t status = buf_read(rp->buf, result);
443     result->ctx = rp->ctx;
444 
445     if (status != ERR_NO_MSG)
446         return status;
447 
448     // early return allows compiler to elide the rest for the group read case.
449     if (!timeout)
450         return ERR_TIMED_OUT;
451 
452     status_t wr = wait_queue_block(&rp->wait, timeout);
453     if (wr != NO_ERROR)
454         return wr;
455     // recursive tail call is usually optimized away with a goto.
456     return read_no_lock(rp, timeout, result);
457 }
458 
port_read(port_t port,lk_time_t timeout,port_result_t * result)459 status_t port_read(port_t port, lk_time_t timeout, port_result_t *result)
460 {
461     if (!port || !result)
462         return ERR_INVALID_ARGS;
463 
464     status_t rc = ERR_GENERIC;
465     read_port_t *rp = (read_port_t *)port;
466 
467     THREAD_LOCK(state);
468     if (rp->magic == READPORT_MAGIC) {
469         // dealing with a single port.
470         rc = read_no_lock(rp, timeout, result);
471     } else if (rp->magic == PORTGROUP_MAGIC) {
472         // dealing with a port group.
473         port_group_t *pg = (port_group_t *)port;
474         do {
475             // read each port with no timeout.
476             // todo: this order is fixed, probably a bad thing.
477             list_for_every_entry(&pg->rp_list, rp, read_port_t, g_node) {
478                 rc = read_no_lock(rp, 0, result);
479                 if (rc != ERR_TIMED_OUT)
480                     goto read_exit;
481             }
482             // no data, block on the group waitqueue.
483             rc = wait_queue_block(&pg->wait, timeout);
484         } while (rc == NO_ERROR);
485     } else {
486         // wrong port type.
487         rc = ERR_BAD_HANDLE;
488     }
489 
490 read_exit:
491     THREAD_UNLOCK(state);
492     return rc;
493 }
494 
port_destroy(port_t port)495 status_t port_destroy(port_t port)
496 {
497     if (!port)
498         return ERR_INVALID_ARGS;
499 
500     write_port_t *wp = (write_port_t *) port;
501     port_buf_t *buf = NULL;
502 
503     THREAD_LOCK(state);
504     if (wp->magic != WRITEPORT_MAGIC_X) {
505         // wrong port type.
506         THREAD_UNLOCK(state);
507         return ERR_BAD_HANDLE;
508     }
509     // remove self from global named ports list.
510     list_delete(&wp->node);
511 
512     if (wp->buf) {
513         // we have no readers.
514         buf = wp->buf;
515     } else {
516         // for each reader:
517         read_port_t *rp;
518         list_for_every_entry(&wp->rp_list, rp, read_port_t, w_node) {
519             // wake the read and group ports.
520             wait_queue_wake_all(&rp->wait, false, ERR_CANCELLED);
521             if (rp->gport) {
522                 wait_queue_wake_all(&rp->gport->wait, false, ERR_CANCELLED);
523             }
524             // remove self from reader ports.
525             rp->wport = NULL;
526         }
527     }
528 
529     wp->magic = 0;
530     THREAD_UNLOCK(state);
531 
532     free(buf);
533     free(wp);
534     return NO_ERROR;
535 }
536 
port_close(port_t port)537 status_t port_close(port_t port)
538 {
539     if (!port)
540         return ERR_INVALID_ARGS;
541 
542     read_port_t *rp = (read_port_t *) port;
543     port_buf_t *buf = NULL;
544 
545     THREAD_LOCK(state);
546     if (rp->magic == READPORT_MAGIC) {
547         // dealing with a read port.
548         if (rp->wport) {
549             // remove self from write port list and reassign the bufer if last.
550             list_delete(&rp->w_node);
551             if (list_is_empty(&rp->wport->rp_list)) {
552                 rp->wport->buf = rp->buf;
553                 rp->buf = NULL;
554             } else {
555                 buf = rp->buf;
556             }
557         }
558         if (rp->gport) {
559             // remove self from port group list.
560             list_delete(&rp->g_node);
561         }
562         // wake up waiters, the return code is ERR_OBJECT_DESTROYED.
563         wait_queue_destroy(&rp->wait, true);
564         rp->magic = 0;
565 
566     } else if (rp->magic == PORTGROUP_MAGIC) {
567         // dealing with a port group.
568         port_group_t *pg = (port_group_t *) port;
569         // wake up waiters.
570         wait_queue_destroy(&pg->wait, true);
571         // remove self from reader ports.
572         rp = NULL;
573         list_for_every_entry(&pg->rp_list, rp, read_port_t, g_node) {
574             rp->gport = NULL;
575         }
576         pg->magic = 0;
577 
578     } else if (rp->magic == WRITEPORT_MAGIC_W) {
579         // dealing with a write port.
580         write_port_t *wp = (write_port_t *) port;
581         // mark it as closed. Now it can be read but not written to.
582         wp->magic = WRITEPORT_MAGIC_X;
583         THREAD_UNLOCK(state);
584         return NO_ERROR;
585 
586     } else {
587         THREAD_UNLOCK(state);
588         return ERR_BAD_HANDLE;
589     }
590 
591     THREAD_UNLOCK(state);
592 
593     free(buf);
594     free(port);
595     return NO_ERROR;
596 }
597 
598