Lines Matching +full:multi +full:- +full:cluster
1 // SPDX-License-Identifier: GPL-2.0-only
5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6 ** Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
15 * This is the "low-level" comms layer.
18 * from other nodes in the cluster.
20 * Cluster nodes are referred to by their nodeids. nodeids are
21 * simply 32 bit numbers to the locking module - if they need to
22 * be expanded for the cluster infrastructure then that is its
25 * whatever it needs for inter-node communication.
29 * up to the mid-level comms layer (which understands the
40 * cluster-wide mechanism as it must be the same on all nodes of the cluster
115 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
122 #define DLM_WQ_REMAIN_BYTES(e) (PAGE_SIZE - e->end)
123 #define DLM_WQ_LENGTH_BYTES(e) (e->end - e->offset)
216 assert_spin_locked(&con->writequeue_lock); in lowcomms_queue_swork()
218 if (!test_bit(CF_IO_STOP, &con->flags) && in lowcomms_queue_swork()
219 !test_bit(CF_APP_LIMITED, &con->flags) && in lowcomms_queue_swork()
220 !test_and_set_bit(CF_SEND_PENDING, &con->flags)) in lowcomms_queue_swork()
221 queue_work(io_workqueue, &con->swork); in lowcomms_queue_swork()
227 WARN_ON_ONCE(!lockdep_sock_is_held(con->sock->sk)); in lowcomms_queue_rwork()
230 if (!test_bit(CF_IO_STOP, &con->flags) && in lowcomms_queue_rwork()
231 !test_and_set_bit(CF_RECV_PENDING, &con->flags)) in lowcomms_queue_rwork()
232 queue_work(io_workqueue, &con->rwork); in lowcomms_queue_rwork()
239 INIT_LIST_HEAD(&entry->msgs); in writequeue_entry_ctor()
258 e = list_first_entry_or_null(&con->writequeue, struct writequeue_entry, in con_next_wq()
263 if (!e || e->users || e->len == 0) in con_next_wq()
274 if (con->nodeid == nodeid) in __find_con()
283 con->nodeid = nodeid; in dlm_con_init()
284 init_rwsem(&con->sock_lock); in dlm_con_init()
285 INIT_LIST_HEAD(&con->writequeue); in dlm_con_init()
286 spin_lock_init(&con->writequeue_lock); in dlm_con_init()
287 INIT_WORK(&con->swork, process_send_sockets); in dlm_con_init()
288 INIT_WORK(&con->rwork, process_recv_sockets); in dlm_con_init()
289 spin_lock_init(&con->addrs_lock); in dlm_con_init()
290 init_waitqueue_head(&con->shutdown_wait); in dlm_con_init()
327 hlist_add_head_rcu(&con->list, &connection_hash[r]); in nodeid2con()
336 switch (x->ss_family) { in addr_compare()
340 if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr) in addr_compare()
342 if (sinx->sin_port != siny->sin_port) in addr_compare()
349 if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr)) in addr_compare()
351 if (sinx->sin6_port != siny->sin6_port) in addr_compare()
370 return -1; in nodeid_to_addr()
376 return -ENOENT; in nodeid_to_addr()
379 spin_lock(&con->addrs_lock); in nodeid_to_addr()
380 if (!con->addr_count) { in nodeid_to_addr()
381 spin_unlock(&con->addrs_lock); in nodeid_to_addr()
383 return -ENOENT; in nodeid_to_addr()
386 memcpy(&sas, &con->addr[con->curr_addr_index], in nodeid_to_addr()
390 con->curr_addr_index++; in nodeid_to_addr()
391 if (con->curr_addr_index == con->addr_count) in nodeid_to_addr()
392 con->curr_addr_index = 0; in nodeid_to_addr()
395 *mark = con->mark; in nodeid_to_addr()
396 spin_unlock(&con->addrs_lock); in nodeid_to_addr()
409 ret4->sin_addr.s_addr = in4->sin_addr.s_addr; in nodeid_to_addr()
413 ret6->sin6_addr = in6->sin6_addr; in nodeid_to_addr()
429 WARN_ON_ONCE(!con->addr_count); in addr_to_nodeid()
431 spin_lock(&con->addrs_lock); in addr_to_nodeid()
432 for (addr_i = 0; addr_i < con->addr_count; addr_i++) { in addr_to_nodeid()
433 if (addr_compare(&con->addr[addr_i], addr)) { in addr_to_nodeid()
434 *nodeid = con->nodeid; in addr_to_nodeid()
435 *mark = con->mark; in addr_to_nodeid()
436 spin_unlock(&con->addrs_lock); in addr_to_nodeid()
441 spin_unlock(&con->addrs_lock); in addr_to_nodeid()
446 return -ENOENT; in addr_to_nodeid()
454 for (i = 0; i < con->addr_count; i++) { in dlm_lowcomms_con_has_addr()
455 if (addr_compare(&con->addr[i], addr)) in dlm_lowcomms_con_has_addr()
472 return -ENOMEM; in dlm_lowcomms_addr()
475 spin_lock(&con->addrs_lock); in dlm_lowcomms_addr()
476 if (!con->addr_count) { in dlm_lowcomms_addr()
477 memcpy(&con->addr[0], addr, sizeof(*addr)); in dlm_lowcomms_addr()
478 con->addr_count = 1; in dlm_lowcomms_addr()
479 con->mark = dlm_config.ci_mark; in dlm_lowcomms_addr()
480 spin_unlock(&con->addrs_lock); in dlm_lowcomms_addr()
487 spin_unlock(&con->addrs_lock); in dlm_lowcomms_addr()
489 return -EEXIST; in dlm_lowcomms_addr()
492 if (con->addr_count >= DLM_MAX_ADDR_COUNT) { in dlm_lowcomms_addr()
493 spin_unlock(&con->addrs_lock); in dlm_lowcomms_addr()
495 return -ENOSPC; in dlm_lowcomms_addr()
498 memcpy(&con->addr[con->addr_count++], addr, sizeof(*addr)); in dlm_lowcomms_addr()
500 spin_unlock(&con->addrs_lock); in dlm_lowcomms_addr()
511 set_bit(CF_RECV_INTR, &con->flags); in lowcomms_data_ready()
519 clear_bit(SOCK_NOSPACE, &con->sock->flags); in lowcomms_write_space()
521 spin_lock_bh(&con->writequeue_lock); in lowcomms_write_space()
522 if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) { in lowcomms_write_space()
523 con->sock->sk->sk_write_pending--; in lowcomms_write_space()
524 clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags); in lowcomms_write_space()
528 spin_unlock_bh(&con->writequeue_lock); in lowcomms_write_space()
536 if (sk->sk_shutdown == RCV_SHUTDOWN) in lowcomms_state_change()
556 return -ENOENT; in dlm_lowcomms_connect_node()
559 down_read(&con->sock_lock); in dlm_lowcomms_connect_node()
560 if (!con->sock) { in dlm_lowcomms_connect_node()
561 spin_lock_bh(&con->writequeue_lock); in dlm_lowcomms_connect_node()
563 spin_unlock_bh(&con->writequeue_lock); in dlm_lowcomms_connect_node()
565 up_read(&con->sock_lock); in dlm_lowcomms_connect_node()
581 return -ENOENT; in dlm_lowcomms_nodes_set_mark()
584 spin_lock(&con->addrs_lock); in dlm_lowcomms_nodes_set_mark()
585 con->mark = mark; in dlm_lowcomms_nodes_set_mark()
586 spin_unlock(&con->addrs_lock); in dlm_lowcomms_nodes_set_mark()
597 switch (sk->sk_family) { in lowcomms_error_report()
602 con->nodeid, &inet->inet_daddr, in lowcomms_error_report()
603 ntohs(inet->inet_dport), sk->sk_err, in lowcomms_error_report()
604 READ_ONCE(sk->sk_err_soft)); in lowcomms_error_report()
611 con->nodeid, &sk->sk_v6_daddr, in lowcomms_error_report()
612 ntohs(inet->inet_dport), sk->sk_err, in lowcomms_error_report()
613 READ_ONCE(sk->sk_err_soft)); in lowcomms_error_report()
620 sk->sk_family, sk->sk_err, in lowcomms_error_report()
621 READ_ONCE(sk->sk_err_soft)); in lowcomms_error_report()
625 dlm_midcomms_unack_msg_resend(con->nodeid); in lowcomms_error_report()
636 sk->sk_user_data = NULL; in restore_callbacks()
637 sk->sk_data_ready = listen_sock.sk_data_ready; in restore_callbacks()
638 sk->sk_state_change = listen_sock.sk_state_change; in restore_callbacks()
639 sk->sk_write_space = listen_sock.sk_write_space; in restore_callbacks()
640 sk->sk_error_report = listen_sock.sk_error_report; in restore_callbacks()
646 struct sock *sk = sock->sk; in add_sock()
649 con->sock = sock; in add_sock()
651 sk->sk_user_data = con; in add_sock()
652 sk->sk_data_ready = lowcomms_data_ready; in add_sock()
653 sk->sk_write_space = lowcomms_write_space; in add_sock()
655 sk->sk_state_change = lowcomms_state_change; in add_sock()
656 sk->sk_allocation = GFP_NOFS; in add_sock()
657 sk->sk_use_task_frag = false; in add_sock()
658 sk->sk_error_report = lowcomms_error_report; in add_sock()
667 saddr->ss_family = dlm_local_addr[0].ss_family; in make_sockaddr()
668 if (saddr->ss_family == AF_INET) { in make_sockaddr()
670 in4_addr->sin_port = port; in make_sockaddr()
672 memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero)); in make_sockaddr()
675 in6_addr->sin6_port = port; in make_sockaddr()
678 memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len); in make_sockaddr()
686 __free_page(e->page); in dlm_page_release()
694 kref_put(&msg->entry->ref, dlm_page_release); in dlm_msg_release()
702 list_for_each_entry_safe(msg, tmp, &e->msgs, list) { in free_entry()
703 if (msg->orig_msg) { in free_entry()
704 msg->orig_msg->retransmit = false; in free_entry()
705 kref_put(&msg->orig_msg->ref, dlm_msg_release); in free_entry()
708 list_del(&msg->list); in free_entry()
709 kref_put(&msg->ref, dlm_msg_release); in free_entry()
712 list_del(&e->list); in free_entry()
713 kref_put(&e->ref, dlm_page_release); in free_entry()
718 lock_sock((*sock)->sk); in dlm_close_sock()
719 restore_callbacks((*sock)->sk); in dlm_close_sock()
720 release_sock((*sock)->sk); in dlm_close_sock()
728 if (con->othercon) in allow_connection_io()
729 clear_bit(CF_IO_STOP, &con->othercon->flags); in allow_connection_io()
730 clear_bit(CF_IO_STOP, &con->flags); in allow_connection_io()
735 if (con->othercon) in stop_connection_io()
736 stop_connection_io(con->othercon); in stop_connection_io()
738 spin_lock_bh(&con->writequeue_lock); in stop_connection_io()
739 set_bit(CF_IO_STOP, &con->flags); in stop_connection_io()
740 spin_unlock_bh(&con->writequeue_lock); in stop_connection_io()
742 down_write(&con->sock_lock); in stop_connection_io()
743 if (con->sock) { in stop_connection_io()
744 lock_sock(con->sock->sk); in stop_connection_io()
745 restore_callbacks(con->sock->sk); in stop_connection_io()
746 release_sock(con->sock->sk); in stop_connection_io()
748 up_write(&con->sock_lock); in stop_connection_io()
750 cancel_work_sync(&con->swork); in stop_connection_io()
751 cancel_work_sync(&con->rwork); in stop_connection_io()
759 if (con->othercon && and_other) in close_connection()
760 close_connection(con->othercon, false); in close_connection()
762 down_write(&con->sock_lock); in close_connection()
763 if (!con->sock) { in close_connection()
764 up_write(&con->sock_lock); in close_connection()
768 dlm_close_sock(&con->sock); in close_connection()
781 spin_lock_bh(&con->writequeue_lock); in close_connection()
782 if (!list_empty(&con->writequeue)) { in close_connection()
783 e = list_first_entry(&con->writequeue, struct writequeue_entry, in close_connection()
785 if (e->dirty) in close_connection()
788 spin_unlock_bh(&con->writequeue_lock); in close_connection()
790 con->rx_leftover = 0; in close_connection()
791 con->retries = 0; in close_connection()
792 clear_bit(CF_APP_LIMITED, &con->flags); in close_connection()
793 clear_bit(CF_RECV_PENDING, &con->flags); in close_connection()
794 clear_bit(CF_SEND_PENDING, &con->flags); in close_connection()
795 up_write(&con->sock_lock); in close_connection()
802 if (con->othercon && and_other) in shutdown_connection()
803 shutdown_connection(con->othercon, false); in shutdown_connection()
806 down_read(&con->sock_lock); in shutdown_connection()
808 if (!con->sock) { in shutdown_connection()
809 up_read(&con->sock_lock); in shutdown_connection()
813 ret = kernel_sock_shutdown(con->sock, SHUT_WR); in shutdown_connection()
814 up_read(&con->sock_lock); in shutdown_connection()
820 ret = wait_event_timeout(con->shutdown_wait, !con->sock, in shutdown_connection()
844 pentry->buf = kmalloc(buflen, GFP_NOFS); in new_processqueue_entry()
845 if (!pentry->buf) { in new_processqueue_entry()
850 pentry->nodeid = nodeid; in new_processqueue_entry()
856 kfree(pentry->buf); in free_processqueue_entry()
873 list_del(&pentry->list); in process_dlm_messages()
879 dlm_process_incoming_buffer(pentry->nodeid, pentry->buf, in process_dlm_messages()
880 pentry->buflen); in process_dlm_messages()
892 list_del(&pentry->list); in process_dlm_messages()
907 pentry = new_processqueue_entry(con->nodeid, buflen); in receive_from_sock()
911 memcpy(pentry->buf, con->rx_leftover_buf, con->rx_leftover); in receive_from_sock()
916 iov.iov_base = pentry->buf + con->rx_leftover; in receive_from_sock()
917 iov.iov_len = buflen - con->rx_leftover; in receive_from_sock()
921 clear_bit(CF_RECV_INTR, &con->flags); in receive_from_sock()
923 ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len, in receive_from_sock()
925 trace_dlm_recv(con->nodeid, ret); in receive_from_sock()
926 if (ret == -EAGAIN) { in receive_from_sock()
927 lock_sock(con->sock->sk); in receive_from_sock()
928 if (test_and_clear_bit(CF_RECV_INTR, &con->flags)) { in receive_from_sock()
929 release_sock(con->sock->sk); in receive_from_sock()
933 clear_bit(CF_RECV_PENDING, &con->flags); in receive_from_sock()
934 release_sock(con->sock->sk); in receive_from_sock()
947 buflen_real = ret + con->rx_leftover; in receive_from_sock()
948 ret = dlm_validate_incoming_buffer(con->nodeid, pentry->buf, in receive_from_sock()
955 pentry->buflen = ret; in receive_from_sock()
961 con->rx_leftover = buflen_real - ret; in receive_from_sock()
962 memmove(con->rx_leftover_buf, pentry->buf + ret, in receive_from_sock()
963 con->rx_leftover); in receive_from_sock()
967 list_add_tail(&pentry->list, &processqueue); in receive_from_sock()
990 if (result == -EAGAIN) in accept_from_sock()
997 len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2); in accept_from_sock()
999 result = -ECONNABORTED; in accept_from_sock()
1010 log_print("connect from non cluster IPv4 node %pI4", in accept_from_sock()
1011 &sin->sin_addr); in accept_from_sock()
1018 log_print("connect from non cluster IPv6 node %pI6c", in accept_from_sock()
1019 &sin6->sin6_addr); in accept_from_sock()
1024 log_print("invalid family from non cluster node"); in accept_from_sock()
1029 return -1; in accept_from_sock()
1043 result = -ENOENT; in accept_from_sock()
1047 sock_set_mark(newsock->sk, mark); in accept_from_sock()
1049 down_write(&newcon->sock_lock); in accept_from_sock()
1050 if (newcon->sock) { in accept_from_sock()
1051 struct connection *othercon = newcon->othercon; in accept_from_sock()
1057 up_write(&newcon->sock_lock); in accept_from_sock()
1059 result = -ENOMEM; in accept_from_sock()
1064 lockdep_set_subclass(&othercon->sock_lock, 1); in accept_from_sock()
1065 newcon->othercon = othercon; in accept_from_sock()
1066 set_bit(CF_IS_OTHERCON, &othercon->flags); in accept_from_sock()
1072 down_write(&othercon->sock_lock); in accept_from_sock()
1076 lock_sock(othercon->sock->sk); in accept_from_sock()
1078 release_sock(othercon->sock->sk); in accept_from_sock()
1079 up_write(&othercon->sock_lock); in accept_from_sock()
1088 lock_sock(newcon->sock->sk); in accept_from_sock()
1090 release_sock(newcon->sock->sk); in accept_from_sock()
1092 up_write(&newcon->sock_lock); in accept_from_sock()
1105 * writequeue_entry_complete - try to delete and free write queue entry
1113 e->offset += completed; in writequeue_entry_complete()
1114 e->len -= completed; in writequeue_entry_complete()
1116 e->dirty = true; in writequeue_entry_complete()
1118 if (e->len == 0 && e->users == 0) in writequeue_entry_complete()
1123 * sctp_bind_addrs - bind a SCTP socket to all our addresses
1138 result = sock_bind_add(sock->sk, addr, addr_len); in sctp_bind_addrs()
1172 entry->page = alloc_page(GFP_ATOMIC | __GFP_ZERO); in new_writequeue_entry()
1173 if (!entry->page) { in new_writequeue_entry()
1178 entry->offset = 0; in new_writequeue_entry()
1179 entry->len = 0; in new_writequeue_entry()
1180 entry->end = 0; in new_writequeue_entry()
1181 entry->dirty = false; in new_writequeue_entry()
1182 entry->con = con; in new_writequeue_entry()
1183 entry->users = 1; in new_writequeue_entry()
1184 kref_init(&entry->ref); in new_writequeue_entry()
1194 spin_lock_bh(&con->writequeue_lock); in new_wq_entry()
1195 if (!list_empty(&con->writequeue)) { in new_wq_entry()
1196 e = list_last_entry(&con->writequeue, struct writequeue_entry, list); in new_wq_entry()
1198 kref_get(&e->ref); in new_wq_entry()
1200 *ppc = page_address(e->page) + e->end; in new_wq_entry()
1204 e->end += len; in new_wq_entry()
1205 e->users++; in new_wq_entry()
1214 kref_get(&e->ref); in new_wq_entry()
1215 *ppc = page_address(e->page); in new_wq_entry()
1216 e->end += len; in new_wq_entry()
1220 list_add_tail(&e->list, &con->writequeue); in new_wq_entry()
1223 spin_unlock_bh(&con->writequeue_lock); in new_wq_entry()
1238 kref_init(&msg->ref); in dlm_lowcomms_new_msg_con()
1246 msg->retransmit = false; in dlm_lowcomms_new_msg_con()
1247 msg->orig_msg = NULL; in dlm_lowcomms_new_msg_con()
1248 msg->ppc = *ppc; in dlm_lowcomms_new_msg_con()
1249 msg->len = len; in dlm_lowcomms_new_msg_con()
1250 msg->entry = e; in dlm_lowcomms_new_msg_con()
1288 kref_get(&msg->ref); in dlm_lowcomms_new_msg()
1290 msg->idx = idx; in dlm_lowcomms_new_msg()
1297 struct writequeue_entry *e = msg->entry; in _dlm_lowcomms_commit_msg()
1298 struct connection *con = e->con; in _dlm_lowcomms_commit_msg()
1301 spin_lock_bh(&con->writequeue_lock); in _dlm_lowcomms_commit_msg()
1302 kref_get(&msg->ref); in _dlm_lowcomms_commit_msg()
1303 list_add(&msg->list, &e->msgs); in _dlm_lowcomms_commit_msg()
1305 users = --e->users; in _dlm_lowcomms_commit_msg()
1309 e->len = DLM_WQ_LENGTH_BYTES(e); in _dlm_lowcomms_commit_msg()
1314 spin_unlock_bh(&con->writequeue_lock); in _dlm_lowcomms_commit_msg()
1325 srcu_read_unlock(&connections_srcu, msg->idx); in dlm_lowcomms_commit_msg()
1327 kref_put(&msg->ref, dlm_msg_release); in dlm_lowcomms_commit_msg()
1333 kref_put(&msg->ref, dlm_msg_release); in dlm_lowcomms_put_msg()
1342 if (msg->retransmit) in dlm_lowcomms_resend_msg()
1345 msg_resend = dlm_lowcomms_new_msg_con(msg->entry->con, msg->len, &ppc, in dlm_lowcomms_resend_msg()
1348 return -ENOMEM; in dlm_lowcomms_resend_msg()
1350 msg->retransmit = true; in dlm_lowcomms_resend_msg()
1351 kref_get(&msg->ref); in dlm_lowcomms_resend_msg()
1352 msg_resend->orig_msg = msg; in dlm_lowcomms_resend_msg()
1354 memcpy(ppc, msg->ppc, msg->len); in dlm_lowcomms_resend_msg()
1371 spin_lock_bh(&con->writequeue_lock); in send_to_sock()
1374 clear_bit(CF_SEND_PENDING, &con->flags); in send_to_sock()
1375 spin_unlock_bh(&con->writequeue_lock); in send_to_sock()
1379 len = e->len; in send_to_sock()
1380 offset = e->offset; in send_to_sock()
1381 WARN_ON_ONCE(len == 0 && e->users == 0); in send_to_sock()
1382 spin_unlock_bh(&con->writequeue_lock); in send_to_sock()
1384 bvec_set_page(&bvec, e->page, len, offset); in send_to_sock()
1386 ret = sock_sendmsg(con->sock, &msg); in send_to_sock()
1387 trace_dlm_send(con->nodeid, ret); in send_to_sock()
1388 if (ret == -EAGAIN || ret == 0) { in send_to_sock()
1389 lock_sock(con->sock->sk); in send_to_sock()
1390 spin_lock_bh(&con->writequeue_lock); in send_to_sock()
1391 if (test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) && in send_to_sock()
1392 !test_and_set_bit(CF_APP_LIMITED, &con->flags)) { in send_to_sock()
1396 set_bit(SOCK_NOSPACE, &con->sock->sk->sk_socket->flags); in send_to_sock()
1397 con->sock->sk->sk_write_pending++; in send_to_sock()
1399 clear_bit(CF_SEND_PENDING, &con->flags); in send_to_sock()
1400 spin_unlock_bh(&con->writequeue_lock); in send_to_sock()
1401 release_sock(con->sock->sk); in send_to_sock()
1406 spin_unlock_bh(&con->writequeue_lock); in send_to_sock()
1407 release_sock(con->sock->sk); in send_to_sock()
1414 spin_lock_bh(&con->writequeue_lock); in send_to_sock()
1416 spin_unlock_bh(&con->writequeue_lock); in send_to_sock()
1425 spin_lock_bh(&con->writequeue_lock); in clean_one_writequeue()
1426 list_for_each_entry_safe(e, safe, &con->writequeue, list) { in clean_one_writequeue()
1429 spin_unlock_bh(&con->writequeue_lock); in clean_one_writequeue()
1436 WARN_ON_ONCE(!list_empty(&con->writequeue)); in connection_release()
1437 WARN_ON_ONCE(con->sock); in connection_release()
1442 left the cluster */
1454 return -ENOENT; in dlm_lowcomms_close()
1462 hlist_del_rcu(&con->list); in dlm_lowcomms_close()
1466 call_srcu(&connections_srcu, &con->rcu, connection_release); in dlm_lowcomms_close()
1467 if (con->othercon) { in dlm_lowcomms_close()
1468 clean_one_writequeue(con->othercon); in dlm_lowcomms_close()
1469 call_srcu(&connections_srcu, &con->othercon->rcu, connection_release); in dlm_lowcomms_close()
1488 down_read(&con->sock_lock); in process_recv_sockets()
1489 if (!con->sock) { in process_recv_sockets()
1490 up_read(&con->sock_lock); in process_recv_sockets()
1498 up_read(&con->sock_lock); in process_recv_sockets()
1506 wake_up(&con->shutdown_wait); in process_recv_sockets()
1527 queue_work(io_workqueue, &con->rwork); in process_recv_sockets()
1532 if (test_bit(CF_IS_OTHERCON, &con->flags)) { in process_recv_sockets()
1535 spin_lock_bh(&con->writequeue_lock); in process_recv_sockets()
1537 spin_unlock_bh(&con->writequeue_lock); in process_recv_sockets()
1575 result = nodeid_to_addr(con->nodeid, &addr, NULL, in dlm_connect()
1576 dlm_proto_ops->try_new_addr, &mark); in dlm_connect()
1578 log_print("no address for nodeid %d", con->nodeid); in dlm_connect()
1584 SOCK_STREAM, dlm_proto_ops->proto, &sock); in dlm_connect()
1588 sock_set_mark(sock->sk, mark); in dlm_connect()
1589 dlm_proto_ops->sockopts(sock); in dlm_connect()
1591 result = dlm_proto_ops->bind(sock); in dlm_connect()
1599 log_print_ratelimited("connecting to %d", con->nodeid); in dlm_connect()
1603 case -EINPROGRESS: in dlm_connect()
1610 dlm_close_sock(&con->sock); in dlm_connect()
1624 WARN_ON_ONCE(test_bit(CF_IS_OTHERCON, &con->flags)); in process_send_sockets()
1626 down_read(&con->sock_lock); in process_send_sockets()
1627 if (!con->sock) { in process_send_sockets()
1628 up_read(&con->sock_lock); in process_send_sockets()
1629 down_write(&con->sock_lock); in process_send_sockets()
1630 if (!con->sock) { in process_send_sockets()
1637 up_write(&con->sock_lock); in process_send_sockets()
1639 con->nodeid, con->retries++, ret); in process_send_sockets()
1642 * future we should send a event to cluster in process_send_sockets()
1646 queue_work(io_workqueue, &con->swork); in process_send_sockets()
1650 downgrade_write(&con->sock_lock); in process_send_sockets()
1656 up_read(&con->sock_lock); in process_send_sockets()
1665 queue_work(io_workqueue, &con->swork); in process_send_sockets()
1672 spin_lock_bh(&con->writequeue_lock); in process_send_sockets()
1674 spin_unlock_bh(&con->writequeue_lock); in process_send_sockets()
1702 return -ENOMEM; in work_start()
1710 return -ENOMEM; in work_start()
1722 lock_sock(listen_con.sock->sk); in dlm_lowcomms_shutdown()
1723 listen_con.sock->sk->sk_data_ready = listen_sock.sk_data_ready; in dlm_lowcomms_shutdown()
1724 release_sock(listen_con.sock->sk); in dlm_lowcomms_shutdown()
1738 if (con->othercon) in dlm_lowcomms_shutdown()
1739 clean_one_writequeue(con->othercon); in dlm_lowcomms_shutdown()
1758 dlm_proto_ops->name); in dlm_listen_for_all()
1760 result = dlm_proto_ops->listen_validate(); in dlm_listen_for_all()
1765 SOCK_STREAM, dlm_proto_ops->proto, &sock); in dlm_listen_for_all()
1771 sock_set_mark(sock->sk, dlm_config.ci_mark); in dlm_listen_for_all()
1772 dlm_proto_ops->listen_sockopts(sock); in dlm_listen_for_all()
1774 result = dlm_proto_ops->listen_bind(sock); in dlm_listen_for_all()
1778 lock_sock(sock->sk); in dlm_listen_for_all()
1779 listen_sock.sk_data_ready = sock->sk->sk_data_ready; in dlm_listen_for_all()
1780 listen_sock.sk_write_space = sock->sk->sk_write_space; in dlm_listen_for_all()
1781 listen_sock.sk_error_report = sock->sk->sk_error_report; in dlm_listen_for_all()
1782 listen_sock.sk_state_change = sock->sk->sk_state_change; in dlm_listen_for_all()
1786 sock->sk->sk_allocation = GFP_NOFS; in dlm_listen_for_all()
1787 sock->sk->sk_use_task_frag = false; in dlm_listen_for_all()
1788 sock->sk->sk_data_ready = lowcomms_listen_data_ready; in dlm_listen_for_all()
1789 release_sock(sock->sk); in dlm_listen_for_all()
1791 result = sock->ops->listen(sock, 128); in dlm_listen_for_all()
1809 /* Bind to our cluster-known address connecting to avoid in dlm_tcp_bind()
1827 /* We don't support multi-homed hosts */ in dlm_tcp_listen_validate()
1829 log_print("TCP protocol can't handle multi-homed hosts, try SCTP"); in dlm_tcp_listen_validate()
1830 return -EINVAL; in dlm_tcp_listen_validate()
1839 tcp_sock_set_nodelay(sock->sk); in dlm_tcp_sockopts()
1845 sock_set_reuseaddr(sock->sk); in dlm_tcp_listen_sockopts()
1877 return -EOPNOTSUPP; in dlm_sctp_listen_validate()
1892 sctp_sock_set_nodelay(sock->sk); in dlm_sctp_sockopts()
1893 sock_set_rcvbuf(sock->sk, NEEDED_RMEM); in dlm_sctp_sockopts()
1913 error = -ENOTCONN; in dlm_lowcomms_start()
1933 error = -EINVAL; in dlm_lowcomms_start()
1970 hlist_del_rcu(&con->list); in dlm_lowcomms_exit()
1973 if (con->othercon) in dlm_lowcomms_exit()
1974 call_srcu(&connections_srcu, &con->othercon->rcu, in dlm_lowcomms_exit()
1976 call_srcu(&connections_srcu, &con->rcu, connection_release); in dlm_lowcomms_exit()