1 /* 2 * Copyright (c) 2006-2018, RT-Thread Development Team 3 * 4 * SPDX-License-Identifier: Apache-2.0 5 * 6 * Change Logs: 7 * Date Author Notes 8 */ 9 10 #include <string.h> 11 #include "mqueue.h" 12 #include "pthread_internal.h" 13 14 static mqd_t posix_mq_list = RT_NULL; 15 static struct rt_semaphore posix_mq_lock; 16 void posix_mq_system_init() 17 { 18 rt_sem_init(&posix_mq_lock, "pmq", 1, RT_IPC_FLAG_FIFO); 19 } 20 21 rt_inline void posix_mq_insert(mqd_t pmq) 22 { 23 pmq->next = posix_mq_list; 24 posix_mq_list = pmq; 25 } 26 27 static void posix_mq_delete(mqd_t pmq) 28 { 29 mqd_t iter; 30 if (posix_mq_list == pmq) 31 { 32 posix_mq_list = pmq->next; 33 34 rt_mq_delete(pmq->mq); 35 rt_free(pmq); 36 37 return; 38 } 39 for (iter = posix_mq_list; iter->next != RT_NULL; iter = iter->next) 40 { 41 if (iter->next == pmq) 42 { 43 /* delete this mq */ 44 if (pmq->next != RT_NULL) 45 iter->next = pmq->next; 46 else 47 iter->next = RT_NULL; 48 49 /* delete RT-Thread mqueue */ 50 rt_mq_delete(pmq->mq); 51 rt_free(pmq); 52 53 return ; 54 } 55 } 56 } 57 58 static mqd_t posix_mq_find(const char* name) 59 { 60 mqd_t iter; 61 rt_object_t object; 62 63 for (iter = posix_mq_list; iter != RT_NULL; iter = iter->next) 64 { 65 object = (rt_object_t)(iter->mq); 66 67 if (strncmp(object->name, name, RT_NAME_MAX) == 0) 68 { 69 return iter; 70 } 71 } 72 73 return RT_NULL; 74 } 75 76 int mq_setattr(mqd_t mqdes, 77 const struct mq_attr *mqstat, 78 struct mq_attr *omqstat) 79 { 80 rt_set_errno(-RT_ERROR); 81 82 return -1; 83 } 84 RTM_EXPORT(mq_setattr); 85 86 int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat) 87 { 88 if ((mqdes == RT_NULL) || mqstat == RT_NULL) 89 { 90 rt_set_errno(EBADF); 91 92 return -1; 93 } 94 95 mqstat->mq_maxmsg = mqdes->mq->max_msgs; 96 mqstat->mq_msgsize = mqdes->mq->msg_size; 97 mqstat->mq_curmsgs = 0; 98 mqstat->mq_flags = 0; 99 100 return 0; 101 } 102 RTM_EXPORT(mq_getattr); 103 104 mqd_t mq_open(const char *name, int oflag, ...) 105 { 106 mqd_t mqdes; 107 va_list arg; 108 mode_t mode; 109 struct mq_attr *attr = RT_NULL; 110 111 /* lock posix mqueue list */ 112 rt_sem_take(&posix_mq_lock, RT_WAITING_FOREVER); 113 114 mqdes = RT_NULL; 115 if (oflag & O_CREAT) 116 { 117 va_start(arg, oflag); 118 mode = (mode_t)va_arg(arg, unsigned int); 119 mode = mode; 120 attr = (struct mq_attr *)va_arg(arg, struct mq_attr *); 121 va_end(arg); 122 123 if (oflag & O_EXCL) 124 { 125 if (posix_mq_find(name) != RT_NULL) 126 { 127 rt_set_errno(EEXIST); 128 goto __return; 129 } 130 } 131 mqdes = (mqd_t) rt_malloc (sizeof(struct mqdes)); 132 if (mqdes == RT_NULL) 133 { 134 rt_set_errno(ENFILE); 135 goto __return; 136 } 137 138 /* create RT-Thread message queue */ 139 mqdes->mq = rt_mq_create(name, attr->mq_msgsize, attr->mq_maxmsg, RT_IPC_FLAG_FIFO); 140 if (mqdes->mq == RT_NULL) /* create failed */ 141 { 142 rt_set_errno(ENFILE); 143 goto __return; 144 } 145 /* initialize reference count */ 146 mqdes->refcount = 1; 147 mqdes->unlinked = 0; 148 149 /* insert mq to posix mq list */ 150 posix_mq_insert(mqdes); 151 } 152 else 153 { 154 /* find mqueue */ 155 mqdes = posix_mq_find(name); 156 if (mqdes != RT_NULL) 157 { 158 mqdes->refcount ++; /* increase reference count */ 159 } 160 else 161 { 162 rt_set_errno(ENOENT); 163 goto __return; 164 } 165 } 166 rt_sem_release(&posix_mq_lock); 167 168 return mqdes; 169 170 __return: 171 /* release lock */ 172 rt_sem_release(&posix_mq_lock); 173 174 /* release allocated memory */ 175 if (mqdes != RT_NULL) 176 { 177 if (mqdes->mq != RT_NULL) 178 { 179 /* delete RT-Thread message queue */ 180 rt_mq_delete(mqdes->mq); 181 } 182 rt_free(mqdes); 183 } 184 return RT_NULL; 185 } 186 RTM_EXPORT(mq_open); 187 188 ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio) 189 { 190 rt_err_t result; 191 192 if ((mqdes == RT_NULL) || (msg_ptr == RT_NULL)) 193 { 194 rt_set_errno(EINVAL); 195 196 return -1; 197 } 198 199 result = rt_mq_recv(mqdes->mq, msg_ptr, msg_len, RT_WAITING_FOREVER); 200 if (result == RT_EOK) 201 return msg_len; 202 203 rt_set_errno(EBADF); 204 return -1; 205 } 206 RTM_EXPORT(mq_receive); 207 208 int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio) 209 { 210 rt_err_t result; 211 212 if ((mqdes == RT_NULL) || (msg_ptr == RT_NULL)) 213 { 214 rt_set_errno(EINVAL); 215 216 return -1; 217 } 218 219 result = rt_mq_send(mqdes->mq, (void*)msg_ptr, msg_len); 220 if (result == RT_EOK) 221 return 0; 222 223 rt_set_errno(EBADF); 224 225 return -1; 226 } 227 RTM_EXPORT(mq_send); 228 229 ssize_t mq_timedreceive(mqd_t mqdes, 230 char *msg_ptr, 231 size_t msg_len, 232 unsigned *msg_prio, 233 const struct timespec *abs_timeout) 234 { 235 int tick; 236 rt_err_t result; 237 238 /* parameters check */ 239 if ((mqdes == RT_NULL) || (msg_ptr == RT_NULL)) 240 { 241 rt_set_errno(EINVAL); 242 243 return -1; 244 } 245 246 tick = clock_time_to_tick(abs_timeout); 247 248 result = rt_mq_recv(mqdes->mq, msg_ptr, msg_len, tick); 249 if (result == RT_EOK) 250 return msg_len; 251 252 if (result == -RT_ETIMEOUT) 253 rt_set_errno(ETIMEDOUT); 254 else 255 rt_set_errno(EBADMSG); 256 257 return -1; 258 } 259 RTM_EXPORT(mq_timedreceive); 260 261 int mq_timedsend(mqd_t mqdes, 262 const char *msg_ptr, 263 size_t msg_len, 264 unsigned msg_prio, 265 const struct timespec *abs_timeout) 266 { 267 /* RT-Thread does not support timed send */ 268 return mq_send(mqdes, msg_ptr, msg_len, msg_prio); 269 } 270 RTM_EXPORT(mq_timedsend); 271 272 int mq_notify(mqd_t mqdes, const struct sigevent *notification) 273 { 274 rt_set_errno(-RT_ERROR); 275 276 return -1; 277 } 278 RTM_EXPORT(mq_notify); 279 280 int mq_close(mqd_t mqdes) 281 { 282 if (mqdes == RT_NULL) 283 { 284 rt_set_errno(EINVAL); 285 286 return -1; 287 } 288 289 /* lock posix mqueue list */ 290 rt_sem_take(&posix_mq_lock, RT_WAITING_FOREVER); 291 mqdes->refcount --; 292 if (mqdes->refcount == 0) 293 { 294 /* delete from posix mqueue list */ 295 if (mqdes->unlinked) 296 posix_mq_delete(mqdes); 297 } 298 rt_sem_release(&posix_mq_lock); 299 300 return 0; 301 } 302 RTM_EXPORT(mq_close); 303 304 int mq_unlink(const char *name) 305 { 306 mqd_t pmq; 307 308 /* lock posix mqueue list */ 309 rt_sem_take(&posix_mq_lock, RT_WAITING_FOREVER); 310 pmq = posix_mq_find(name); 311 if (pmq != RT_NULL) 312 { 313 pmq->unlinked = 1; 314 if (pmq->refcount == 0) 315 { 316 /* remove this mqueue */ 317 posix_mq_delete(pmq); 318 } 319 rt_sem_release(&posix_mq_lock); 320 321 return 0; 322 } 323 rt_sem_release(&posix_mq_lock); 324 325 /* no this entry */ 326 rt_set_errno(ENOENT); 327 328 return -1; 329 } 330 RTM_EXPORT(mq_unlink); 331