Lines Matching full:wq
22 #include "io-wq.h"
37 IO_WQ_BIT_EXIT = 0, /* wq exiting */
45 * One for each thread in a wq pool
54 struct io_wq *wq; member
138 static bool create_io_worker(struct io_wq *wq, int index);
140 static bool io_acct_cancel_pending_work(struct io_wq *wq,
144 static void io_wq_cancel_tw_create(struct io_wq *wq);
157 static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound) in io_get_acct() argument
159 return &wq->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND]; in io_get_acct()
162 static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq, in io_work_get_acct() argument
165 return io_get_acct(wq, !(work_flags & IO_WQ_WORK_UNBOUND)); in io_work_get_acct()
170 return io_get_acct(worker->wq, test_bit(IO_WORKER_F_BOUND, &worker->flags)); in io_wq_get_acct()
173 static void io_worker_ref_put(struct io_wq *wq) in io_worker_ref_put() argument
175 if (atomic_dec_and_test(&wq->worker_refs)) in io_worker_ref_put()
176 complete(&wq->worker_done); in io_worker_ref_put()
186 return test_bit(IO_WQ_BIT_EXIT, &worker->wq->state); in io_wq_worker_stopped()
192 struct io_wq *wq = worker->wq; in io_worker_cancel_cb() local
195 raw_spin_lock(&wq->lock); in io_worker_cancel_cb()
197 raw_spin_unlock(&wq->lock); in io_worker_cancel_cb()
198 io_worker_ref_put(wq); in io_worker_cancel_cb()
215 struct io_wq *wq = worker->wq; in io_worker_exit() local
218 struct callback_head *cb = task_work_cancel_match(wq->task, in io_worker_exit()
229 raw_spin_lock(&wq->lock); in io_worker_exit()
233 raw_spin_unlock(&wq->lock); in io_worker_exit()
243 io_worker_ref_put(wq); in io_worker_exit()
272 static bool io_wq_activate_free_worker(struct io_wq *wq, in io_wq_activate_free_worker() argument
284 hlist_nulls_for_each_entry_rcu(worker, n, &wq->free_list, nulls_node) { in io_wq_activate_free_worker()
308 static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct) in io_wq_create_worker() argument
315 pr_warn_once("io-wq is not configured for unbound workers"); in io_wq_create_worker()
317 raw_spin_lock(&wq->lock); in io_wq_create_worker()
319 raw_spin_unlock(&wq->lock); in io_wq_create_worker()
323 raw_spin_unlock(&wq->lock); in io_wq_create_worker()
325 atomic_inc(&wq->worker_refs); in io_wq_create_worker()
326 return create_io_worker(wq, acct->index); in io_wq_create_worker()
339 struct io_wq *wq; in create_worker_cb() local
345 wq = worker->wq; in create_worker_cb()
346 acct = &wq->acct[worker->create_index]; in create_worker_cb()
347 raw_spin_lock(&wq->lock); in create_worker_cb()
353 raw_spin_unlock(&wq->lock); in create_worker_cb()
355 create_io_worker(wq, worker->create_index); in create_worker_cb()
358 io_worker_ref_put(wq); in create_worker_cb()
368 struct io_wq *wq = worker->wq; in io_queue_worker_create() local
371 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) in io_queue_worker_create()
385 atomic_inc(&wq->worker_refs); in io_queue_worker_create()
388 if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) { in io_queue_worker_create()
392 * now set. wq exit does that too, but we can have added this in io_queue_worker_create()
395 if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) in io_queue_worker_create()
396 io_wq_cancel_tw_create(wq); in io_queue_worker_create()
397 io_worker_ref_put(wq); in io_queue_worker_create()
400 io_worker_ref_put(wq); in io_queue_worker_create()
406 io_worker_ref_put(wq); in io_queue_worker_create()
413 struct io_wq *wq = worker->wq; in io_wq_dec_running() local
425 atomic_inc(&wq->worker_refs); in io_wq_dec_running()
433 static void __io_worker_busy(struct io_wq *wq, struct io_worker *worker) in __io_worker_busy() argument
437 raw_spin_lock(&wq->lock); in __io_worker_busy()
439 raw_spin_unlock(&wq->lock); in __io_worker_busy()
446 static void __io_worker_idle(struct io_wq *wq, struct io_worker *worker) in __io_worker_idle() argument
447 __must_hold(wq->lock) in __io_worker_idle()
451 hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list); in __io_worker_idle()
465 static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash) in io_wait_on_hash() argument
469 spin_lock_irq(&wq->hash->wait.lock); in io_wait_on_hash()
470 if (list_empty(&wq->wait.entry)) { in io_wait_on_hash()
471 __add_wait_queue(&wq->hash->wait, &wq->wait); in io_wait_on_hash()
472 if (!test_bit(hash, &wq->hash->map)) { in io_wait_on_hash()
474 list_del_init(&wq->wait.entry); in io_wait_on_hash()
478 spin_unlock_irq(&wq->hash->wait.lock); in io_wait_on_hash()
489 struct io_wq *wq = worker->wq; in io_get_next_work() local
506 tail = wq->hash_tail[hash]; in io_get_next_work()
509 if (!test_and_set_bit(hash, &wq->hash->map)) { in io_get_next_work()
510 wq->hash_tail[hash] = NULL; in io_get_next_work()
529 unstalled = io_wait_on_hash(wq, stall_hash); in io_get_next_work()
533 if (wq_has_sleeper(&wq->hash->wait)) in io_get_next_work()
534 wake_up(&wq->hash->wait); in io_get_next_work()
561 struct io_wq *wq = worker->wq; in io_worker_handle_work() local
562 bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state); in io_worker_handle_work()
593 __io_worker_busy(wq, worker); in io_worker_handle_work()
611 wq->do_work(work); in io_worker_handle_work()
614 linked = wq->free_work(work); in io_worker_handle_work()
622 io_wq_enqueue(wq, linked); in io_worker_handle_work()
626 spin_lock_irq(&wq->hash->wait.lock); in io_worker_handle_work()
627 clear_bit(hash, &wq->hash->map); in io_worker_handle_work()
629 spin_unlock_irq(&wq->hash->wait.lock); in io_worker_handle_work()
630 if (wq_has_sleeper(&wq->hash->wait)) in io_worker_handle_work()
631 wake_up(&wq->hash->wait); in io_worker_handle_work()
645 struct io_wq *wq = worker->wq; in io_wq_worker() local
652 snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid); in io_wq_worker()
655 while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { in io_wq_worker()
667 raw_spin_lock(&wq->lock); in io_wq_worker()
674 raw_spin_unlock(&wq->lock); in io_wq_worker()
679 __io_worker_idle(wq, worker); in io_wq_worker()
680 raw_spin_unlock(&wq->lock); in io_wq_worker()
694 wq->cpu_mask); in io_wq_worker()
698 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) && io_acct_run_queue(acct)) in io_wq_worker()
741 static void io_init_new_worker(struct io_wq *wq, struct io_worker *worker, in io_init_new_worker() argument
746 set_cpus_allowed_ptr(tsk, wq->cpu_mask); in io_init_new_worker()
748 raw_spin_lock(&wq->lock); in io_init_new_worker()
749 hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list); in io_init_new_worker()
750 list_add_tail_rcu(&worker->all_list, &wq->all_list); in io_init_new_worker()
752 raw_spin_unlock(&wq->lock); in io_init_new_worker()
799 struct io_wq *wq; in create_worker_cont() local
803 wq = worker->wq; in create_worker_cont()
806 io_init_new_worker(wq, worker, tsk); in create_worker_cont()
813 raw_spin_lock(&wq->lock); in create_worker_cont()
821 raw_spin_unlock(&wq->lock); in create_worker_cont()
822 while (io_acct_cancel_pending_work(wq, acct, &match)) in create_worker_cont()
825 raw_spin_unlock(&wq->lock); in create_worker_cont()
827 io_worker_ref_put(wq); in create_worker_cont()
847 static bool create_io_worker(struct io_wq *wq, int index) in create_io_worker() argument
849 struct io_wq_acct *acct = &wq->acct[index]; in create_io_worker()
859 raw_spin_lock(&wq->lock); in create_io_worker()
861 raw_spin_unlock(&wq->lock); in create_io_worker()
862 io_worker_ref_put(wq); in create_io_worker()
867 worker->wq = wq; in create_io_worker()
876 io_init_new_worker(wq, worker, tsk); in create_io_worker()
892 static bool io_wq_for_each_worker(struct io_wq *wq, in io_wq_for_each_worker() argument
899 list_for_each_entry_rcu(worker, &wq->all_list, all_list) { in io_wq_for_each_worker()
920 static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq) in io_run_cancel() argument
924 wq->do_work(work); in io_run_cancel()
925 work = wq->free_work(work); in io_run_cancel()
929 static void io_wq_insert_work(struct io_wq *wq, struct io_wq_acct *acct, in io_wq_insert_work() argument
942 tail = wq->hash_tail[hash]; in io_wq_insert_work()
943 wq->hash_tail[hash] = work; in io_wq_insert_work()
955 void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) in io_wq_enqueue() argument
958 struct io_wq_acct *acct = io_work_get_acct(wq, work_flags); in io_wq_enqueue()
967 * If io-wq is exiting for this task, or if the request has explicitly in io_wq_enqueue()
970 if (test_bit(IO_WQ_BIT_EXIT, &wq->state) || in io_wq_enqueue()
972 io_run_cancel(work, wq); in io_wq_enqueue()
977 io_wq_insert_work(wq, acct, work, work_flags); in io_wq_enqueue()
982 do_create = !io_wq_activate_free_worker(wq, acct); in io_wq_enqueue()
989 did_create = io_wq_create_worker(wq, acct); in io_wq_enqueue()
993 raw_spin_lock(&wq->lock); in io_wq_enqueue()
995 raw_spin_unlock(&wq->lock); in io_wq_enqueue()
998 raw_spin_unlock(&wq->lock); in io_wq_enqueue()
1001 io_acct_cancel_pending_work(wq, acct, &match); in io_wq_enqueue()
1046 static inline void io_wq_remove_pending(struct io_wq *wq, in io_wq_remove_pending() argument
1054 if (io_wq_is_hashed(work) && work == wq->hash_tail[hash]) { in io_wq_remove_pending()
1058 wq->hash_tail[hash] = prev_work; in io_wq_remove_pending()
1060 wq->hash_tail[hash] = NULL; in io_wq_remove_pending()
1065 static bool io_acct_cancel_pending_work(struct io_wq *wq, in io_acct_cancel_pending_work() argument
1077 io_wq_remove_pending(wq, acct, work, prev); in io_acct_cancel_pending_work()
1079 io_run_cancel(work, wq); in io_acct_cancel_pending_work()
1089 static void io_wq_cancel_pending_work(struct io_wq *wq, in io_wq_cancel_pending_work() argument
1095 struct io_wq_acct *acct = io_get_acct(wq, i == 0); in io_wq_cancel_pending_work()
1097 if (io_acct_cancel_pending_work(wq, acct, match)) { in io_wq_cancel_pending_work()
1105 static void io_wq_cancel_running_work(struct io_wq *wq, in io_wq_cancel_running_work() argument
1109 io_wq_for_each_worker(wq, io_wq_worker_cancel, match); in io_wq_cancel_running_work()
1113 enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel, in io_wq_cancel_cb() argument
1132 * Do both of these while holding the wq->lock, to ensure that in io_wq_cancel_cb()
1135 io_wq_cancel_pending_work(wq, &match); in io_wq_cancel_cb()
1139 raw_spin_lock(&wq->lock); in io_wq_cancel_cb()
1140 io_wq_cancel_running_work(wq, &match); in io_wq_cancel_cb()
1141 raw_spin_unlock(&wq->lock); in io_wq_cancel_cb()
1155 struct io_wq *wq = container_of(wait, struct io_wq, wait); in io_wq_hash_wake() local
1162 struct io_wq_acct *acct = &wq->acct[i]; in io_wq_hash_wake()
1165 io_wq_activate_free_worker(wq, acct); in io_wq_hash_wake()
1174 struct io_wq *wq; in io_wq_create() local
1181 wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL); in io_wq_create()
1182 if (!wq) in io_wq_create()
1186 wq->hash = data->hash; in io_wq_create()
1187 wq->free_work = data->free_work; in io_wq_create()
1188 wq->do_work = data->do_work; in io_wq_create()
1192 if (!alloc_cpumask_var(&wq->cpu_mask, GFP_KERNEL)) in io_wq_create()
1194 cpuset_cpus_allowed(data->task, wq->cpu_mask); in io_wq_create()
1195 wq->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; in io_wq_create()
1196 wq->acct[IO_WQ_ACCT_UNBOUND].max_workers = in io_wq_create()
1198 INIT_LIST_HEAD(&wq->wait.entry); in io_wq_create()
1199 wq->wait.func = io_wq_hash_wake; in io_wq_create()
1201 struct io_wq_acct *acct = &wq->acct[i]; in io_wq_create()
1209 raw_spin_lock_init(&wq->lock); in io_wq_create()
1210 INIT_HLIST_NULLS_HEAD(&wq->free_list, 0); in io_wq_create()
1211 INIT_LIST_HEAD(&wq->all_list); in io_wq_create()
1213 wq->task = get_task_struct(data->task); in io_wq_create()
1214 atomic_set(&wq->worker_refs, 1); in io_wq_create()
1215 init_completion(&wq->worker_done); in io_wq_create()
1216 ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node); in io_wq_create()
1220 return wq; in io_wq_create()
1223 free_cpumask_var(wq->cpu_mask); in io_wq_create()
1224 kfree(wq); in io_wq_create()
1235 return worker->wq == data; in io_task_work_match()
1238 void io_wq_exit_start(struct io_wq *wq) in io_wq_exit_start() argument
1240 set_bit(IO_WQ_BIT_EXIT, &wq->state); in io_wq_exit_start()
1243 static void io_wq_cancel_tw_create(struct io_wq *wq) in io_wq_cancel_tw_create() argument
1247 while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) { in io_wq_cancel_tw_create()
1261 static void io_wq_exit_workers(struct io_wq *wq) in io_wq_exit_workers() argument
1263 if (!wq->task) in io_wq_exit_workers()
1266 io_wq_cancel_tw_create(wq); in io_wq_exit_workers()
1269 io_wq_for_each_worker(wq, io_wq_worker_wake, NULL); in io_wq_exit_workers()
1271 io_worker_ref_put(wq); in io_wq_exit_workers()
1272 wait_for_completion(&wq->worker_done); in io_wq_exit_workers()
1274 spin_lock_irq(&wq->hash->wait.lock); in io_wq_exit_workers()
1275 list_del_init(&wq->wait.entry); in io_wq_exit_workers()
1276 spin_unlock_irq(&wq->hash->wait.lock); in io_wq_exit_workers()
1278 put_task_struct(wq->task); in io_wq_exit_workers()
1279 wq->task = NULL; in io_wq_exit_workers()
1282 static void io_wq_destroy(struct io_wq *wq) in io_wq_destroy() argument
1289 cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node); in io_wq_destroy()
1290 io_wq_cancel_pending_work(wq, &match); in io_wq_destroy()
1291 free_cpumask_var(wq->cpu_mask); in io_wq_destroy()
1292 io_wq_put_hash(wq->hash); in io_wq_destroy()
1293 kfree(wq); in io_wq_destroy()
1296 void io_wq_put_and_exit(struct io_wq *wq) in io_wq_put_and_exit() argument
1298 WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state)); in io_wq_put_and_exit()
1300 io_wq_exit_workers(wq); in io_wq_put_and_exit()
1301 io_wq_destroy(wq); in io_wq_put_and_exit()
1314 cpumask_set_cpu(od->cpu, worker->wq->cpu_mask); in io_wq_worker_affinity()
1316 cpumask_clear_cpu(od->cpu, worker->wq->cpu_mask); in io_wq_worker_affinity()
1320 static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online) in __io_wq_cpu_online() argument
1328 io_wq_for_each_worker(wq, io_wq_worker_affinity, &od); in __io_wq_cpu_online()
1335 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); in io_wq_cpu_online() local
1337 return __io_wq_cpu_online(wq, cpu, true); in io_wq_cpu_online()
1342 struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node); in io_wq_cpu_offline() local
1344 return __io_wq_cpu_online(wq, cpu, false); in io_wq_cpu_offline()
1378 int io_wq_max_workers(struct io_wq *wq, int *new_count) in io_wq_max_workers() argument
1398 raw_spin_lock(&wq->lock); in io_wq_max_workers()
1400 acct = &wq->acct[i]; in io_wq_max_workers()
1405 raw_spin_unlock(&wq->lock); in io_wq_max_workers()
1418 ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online", in io_wq_init()