1// Copyright 2018 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package runtime
6
7import (
8	"internal/runtime/atomic"
9	"unsafe"
10)
11
12// This is based on the former libgo/runtime/netpoll_select.c implementation
13// except that it uses poll instead of select and is written in Go.
14// It's also based on Solaris implementation for the arming mechanisms
15
16//go:cgo_import_dynamic libc_poll poll "libc.a/shr_64.o"
17//go:linkname libc_poll libc_poll
18
19var libc_poll libFunc
20
21//go:nosplit
22func poll(pfds *pollfd, npfds uintptr, timeout uintptr) (int32, int32) {
23	r, err := syscall3(&libc_poll, uintptr(unsafe.Pointer(pfds)), npfds, timeout)
24	return int32(r), int32(err)
25}
26
27// pollfd represents the poll structure for AIX operating system.
28type pollfd struct {
29	fd      int32
30	events  int16
31	revents int16
32}
33
34const _POLLIN = 0x0001
35const _POLLOUT = 0x0002
36const _POLLHUP = 0x2000
37const _POLLERR = 0x4000
38
39var (
40	pfds           []pollfd
41	pds            []*pollDesc
42	mtxpoll        mutex
43	mtxset         mutex
44	rdwake         int32
45	wrwake         int32
46	pendingUpdates int32
47
48	netpollWakeSig atomic.Uint32 // used to avoid duplicate calls of netpollBreak
49)
50
51func netpollinit() {
52	// Create the pipe we use to wakeup poll.
53	r, w, errno := nonblockingPipe()
54	if errno != 0 {
55		throw("netpollinit: failed to create pipe")
56	}
57	rdwake = r
58	wrwake = w
59
60	// Pre-allocate array of pollfd structures for poll.
61	pfds = make([]pollfd, 1, 128)
62
63	// Poll the read side of the pipe.
64	pfds[0].fd = rdwake
65	pfds[0].events = _POLLIN
66
67	pds = make([]*pollDesc, 1, 128)
68	pds[0] = nil
69}
70
71func netpollIsPollDescriptor(fd uintptr) bool {
72	return fd == uintptr(rdwake) || fd == uintptr(wrwake)
73}
74
75// netpollwakeup writes on wrwake to wakeup poll before any changes.
76func netpollwakeup() {
77	if pendingUpdates == 0 {
78		pendingUpdates = 1
79		b := [1]byte{0}
80		write(uintptr(wrwake), unsafe.Pointer(&b[0]), 1)
81	}
82}
83
84func netpollopen(fd uintptr, pd *pollDesc) int32 {
85	lock(&mtxpoll)
86	netpollwakeup()
87
88	lock(&mtxset)
89	unlock(&mtxpoll)
90
91	// We don't worry about pd.fdseq here,
92	// as mtxset protects us from stale pollDescs.
93
94	pd.user = uint32(len(pfds))
95	pfds = append(pfds, pollfd{fd: int32(fd)})
96	pds = append(pds, pd)
97	unlock(&mtxset)
98	return 0
99}
100
101func netpollclose(fd uintptr) int32 {
102	lock(&mtxpoll)
103	netpollwakeup()
104
105	lock(&mtxset)
106	unlock(&mtxpoll)
107
108	for i := 0; i < len(pfds); i++ {
109		if pfds[i].fd == int32(fd) {
110			pfds[i] = pfds[len(pfds)-1]
111			pfds = pfds[:len(pfds)-1]
112
113			pds[i] = pds[len(pds)-1]
114			pds[i].user = uint32(i)
115			pds = pds[:len(pds)-1]
116			break
117		}
118	}
119	unlock(&mtxset)
120	return 0
121}
122
123func netpollarm(pd *pollDesc, mode int) {
124	lock(&mtxpoll)
125	netpollwakeup()
126
127	lock(&mtxset)
128	unlock(&mtxpoll)
129
130	switch mode {
131	case 'r':
132		pfds[pd.user].events |= _POLLIN
133	case 'w':
134		pfds[pd.user].events |= _POLLOUT
135	}
136	unlock(&mtxset)
137}
138
139// netpollBreak interrupts a poll.
140func netpollBreak() {
141	// Failing to cas indicates there is an in-flight wakeup, so we're done here.
142	if !netpollWakeSig.CompareAndSwap(0, 1) {
143		return
144	}
145
146	b := [1]byte{0}
147	write(uintptr(wrwake), unsafe.Pointer(&b[0]), 1)
148}
149
150// netpoll checks for ready network connections.
151// Returns list of goroutines that become runnable.
152// delay < 0: blocks indefinitely
153// delay == 0: does not block, just polls
154// delay > 0: block for up to that many nanoseconds
155//
156//go:nowritebarrierrec
157func netpoll(delay int64) (gList, int32) {
158	var timeout uintptr
159	if delay < 0 {
160		timeout = ^uintptr(0)
161	} else if delay == 0 {
162		// TODO: call poll with timeout == 0
163		return gList{}, 0
164	} else if delay < 1e6 {
165		timeout = 1
166	} else if delay < 1e15 {
167		timeout = uintptr(delay / 1e6)
168	} else {
169		// An arbitrary cap on how long to wait for a timer.
170		// 1e9 ms == ~11.5 days.
171		timeout = 1e9
172	}
173retry:
174	lock(&mtxpoll)
175	lock(&mtxset)
176	pendingUpdates = 0
177	unlock(&mtxpoll)
178
179	n, e := poll(&pfds[0], uintptr(len(pfds)), timeout)
180	if n < 0 {
181		if e != _EINTR {
182			println("errno=", e, " len(pfds)=", len(pfds))
183			throw("poll failed")
184		}
185		unlock(&mtxset)
186		// If a timed sleep was interrupted, just return to
187		// recalculate how long we should sleep now.
188		if timeout > 0 {
189			return gList{}, 0
190		}
191		goto retry
192	}
193	// Check if some descriptors need to be changed
194	if n != 0 && pfds[0].revents&(_POLLIN|_POLLHUP|_POLLERR) != 0 {
195		if delay != 0 {
196			// A netpollwakeup could be picked up by a
197			// non-blocking poll. Only clear the wakeup
198			// if blocking.
199			var b [1]byte
200			for read(rdwake, unsafe.Pointer(&b[0]), 1) == 1 {
201			}
202			netpollWakeSig.Store(0)
203		}
204		// Still look at the other fds even if the mode may have
205		// changed, as netpollBreak might have been called.
206		n--
207	}
208	var toRun gList
209	delta := int32(0)
210	for i := 1; i < len(pfds) && n > 0; i++ {
211		pfd := &pfds[i]
212
213		var mode int32
214		if pfd.revents&(_POLLIN|_POLLHUP|_POLLERR) != 0 {
215			mode += 'r'
216			pfd.events &= ^_POLLIN
217		}
218		if pfd.revents&(_POLLOUT|_POLLHUP|_POLLERR) != 0 {
219			mode += 'w'
220			pfd.events &= ^_POLLOUT
221		}
222		if mode != 0 {
223			pds[i].setEventErr(pfd.revents == _POLLERR, 0)
224			delta += netpollready(&toRun, pds[i], mode)
225			n--
226		}
227	}
228	unlock(&mtxset)
229	return toRun, delta
230}
231