1// Copyright 2018 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package poll
6
7import (
8	"internal/syscall/unix"
9	"runtime"
10	"sync"
11	"syscall"
12	"unsafe"
13)
14
15const (
16	// spliceNonblock doesn't make the splice itself necessarily nonblocking
17	// (because the actual file descriptors that are spliced from/to may block
18	// unless they have the O_NONBLOCK flag set), but it makes the splice pipe
19	// operations nonblocking.
20	spliceNonblock = 0x2
21
22	// maxSpliceSize is the maximum amount of data Splice asks
23	// the kernel to move in a single call to splice(2).
24	// We use 1MB as Splice writes data through a pipe, and 1MB is the default maximum pipe buffer size,
25	// which is determined by /proc/sys/fs/pipe-max-size.
26	maxSpliceSize = 1 << 20
27)
28
29// Splice transfers at most remain bytes of data from src to dst, using the
30// splice system call to minimize copies of data from and to userspace.
31//
32// Splice gets a pipe buffer from the pool or creates a new one if needed, to serve as a buffer for the data transfer.
33// src and dst must both be stream-oriented sockets.
34func Splice(dst, src *FD, remain int64) (written int64, handled bool, err error) {
35	p, err := getPipe()
36	if err != nil {
37		return 0, false, err
38	}
39	defer putPipe(p)
40	var inPipe, n int
41	for err == nil && remain > 0 {
42		max := maxSpliceSize
43		if int64(max) > remain {
44			max = int(remain)
45		}
46		inPipe, err = spliceDrain(p.wfd, src, max)
47		// The operation is considered handled if splice returns no
48		// error, or an error other than EINVAL. An EINVAL means the
49		// kernel does not support splice for the socket type of src.
50		// The failed syscall does not consume any data so it is safe
51		// to fall back to a generic copy.
52		//
53		// spliceDrain should never return EAGAIN, so if err != nil,
54		// Splice cannot continue.
55		//
56		// If inPipe == 0 && err == nil, src is at EOF, and the
57		// transfer is complete.
58		handled = handled || (err != syscall.EINVAL)
59		if err != nil || inPipe == 0 {
60			break
61		}
62		p.data += inPipe
63
64		n, err = splicePump(dst, p.rfd, inPipe)
65		if n > 0 {
66			written += int64(n)
67			remain -= int64(n)
68			p.data -= n
69		}
70	}
71	if err != nil {
72		return written, handled, err
73	}
74	return written, true, nil
75}
76
77// spliceDrain moves data from a socket to a pipe.
78//
79// Invariant: when entering spliceDrain, the pipe is empty. It is either in its
80// initial state, or splicePump has emptied it previously.
81//
82// Given this, spliceDrain can reasonably assume that the pipe is ready for
83// writing, so if splice returns EAGAIN, it must be because the socket is not
84// ready for reading.
85//
86// If spliceDrain returns (0, nil), src is at EOF.
87func spliceDrain(pipefd int, sock *FD, max int) (int, error) {
88	if err := sock.readLock(); err != nil {
89		return 0, err
90	}
91	defer sock.readUnlock()
92	if err := sock.pd.prepareRead(sock.isFile); err != nil {
93		return 0, err
94	}
95	for {
96		// In theory calling splice(2) with SPLICE_F_NONBLOCK could end up an infinite loop here,
97		// because it could return EAGAIN ceaselessly when the write end of the pipe is full,
98		// but this shouldn't be a concern here, since the pipe buffer must be sufficient for
99		// this data transmission on the basis of the workflow in Splice.
100		n, err := splice(pipefd, sock.Sysfd, max, spliceNonblock)
101		if err == syscall.EINTR {
102			continue
103		}
104		if err != syscall.EAGAIN {
105			return n, err
106		}
107		if sock.pd.pollable() {
108			if err := sock.pd.waitRead(sock.isFile); err != nil {
109				return n, err
110			}
111		}
112	}
113}
114
115// splicePump moves all the buffered data from a pipe to a socket.
116//
117// Invariant: when entering splicePump, there are exactly inPipe
118// bytes of data in the pipe, from a previous call to spliceDrain.
119//
120// By analogy to the condition from spliceDrain, splicePump
121// only needs to poll the socket for readiness, if splice returns
122// EAGAIN.
123//
124// If splicePump cannot move all the data in a single call to
125// splice(2), it loops over the buffered data until it has written
126// all of it to the socket. This behavior is similar to the Write
127// step of an io.Copy in userspace.
128func splicePump(sock *FD, pipefd int, inPipe int) (int, error) {
129	if err := sock.writeLock(); err != nil {
130		return 0, err
131	}
132	defer sock.writeUnlock()
133	if err := sock.pd.prepareWrite(sock.isFile); err != nil {
134		return 0, err
135	}
136	written := 0
137	for inPipe > 0 {
138		// In theory calling splice(2) with SPLICE_F_NONBLOCK could end up an infinite loop here,
139		// because it could return EAGAIN ceaselessly when the read end of the pipe is empty,
140		// but this shouldn't be a concern here, since the pipe buffer must contain inPipe size of
141		// data on the basis of the workflow in Splice.
142		n, err := splice(sock.Sysfd, pipefd, inPipe, spliceNonblock)
143		if err == syscall.EINTR {
144			continue
145		}
146		// Here, the condition n == 0 && err == nil should never be
147		// observed, since Splice controls the write side of the pipe.
148		if n > 0 {
149			inPipe -= n
150			written += n
151			continue
152		}
153		if err != syscall.EAGAIN {
154			return written, err
155		}
156		if sock.pd.pollable() {
157			if err := sock.pd.waitWrite(sock.isFile); err != nil {
158				return written, err
159			}
160		}
161	}
162	return written, nil
163}
164
165// splice wraps the splice system call. Since the current implementation
166// only uses splice on sockets and pipes, the offset arguments are unused.
167// splice returns int instead of int64, because callers never ask it to
168// move more data in a single call than can fit in an int32.
169func splice(out int, in int, max int, flags int) (int, error) {
170	n, err := syscall.Splice(in, nil, out, nil, max, flags)
171	return int(n), err
172}
173
174type splicePipeFields struct {
175	rfd  int
176	wfd  int
177	data int
178}
179
180type splicePipe struct {
181	splicePipeFields
182
183	// We want to use a finalizer, so ensure that the size is
184	// large enough to not use the tiny allocator.
185	_ [24 - unsafe.Sizeof(splicePipeFields{})%24]byte
186}
187
188// splicePipePool caches pipes to avoid high-frequency construction and destruction of pipe buffers.
189// The garbage collector will free all pipes in the sync.Pool periodically, thus we need to set up
190// a finalizer for each pipe to close its file descriptors before the actual GC.
191var splicePipePool = sync.Pool{New: newPoolPipe}
192
193func newPoolPipe() any {
194	// Discard the error which occurred during the creation of pipe buffer,
195	// redirecting the data transmission to the conventional way utilizing read() + write() as a fallback.
196	p := newPipe()
197	if p == nil {
198		return nil
199	}
200	runtime.SetFinalizer(p, destroyPipe)
201	return p
202}
203
204// getPipe tries to acquire a pipe buffer from the pool or create a new one with newPipe() if it gets nil from the cache.
205func getPipe() (*splicePipe, error) {
206	v := splicePipePool.Get()
207	if v == nil {
208		return nil, syscall.EINVAL
209	}
210	return v.(*splicePipe), nil
211}
212
213func putPipe(p *splicePipe) {
214	// If there is still data left in the pipe,
215	// then close and discard it instead of putting it back into the pool.
216	if p.data != 0 {
217		runtime.SetFinalizer(p, nil)
218		destroyPipe(p)
219		return
220	}
221	splicePipePool.Put(p)
222}
223
224// newPipe sets up a pipe for a splice operation.
225func newPipe() *splicePipe {
226	var fds [2]int
227	if err := syscall.Pipe2(fds[:], syscall.O_CLOEXEC|syscall.O_NONBLOCK); err != nil {
228		return nil
229	}
230
231	// Splice will loop writing maxSpliceSize bytes from the source to the pipe,
232	// and then write those bytes from the pipe to the destination.
233	// Set the pipe buffer size to maxSpliceSize to optimize that.
234	// Ignore errors here, as a smaller buffer size will work,
235	// although it will require more system calls.
236	unix.Fcntl(fds[0], syscall.F_SETPIPE_SZ, maxSpliceSize)
237
238	return &splicePipe{splicePipeFields: splicePipeFields{rfd: fds[0], wfd: fds[1]}}
239}
240
241// destroyPipe destroys a pipe.
242func destroyPipe(p *splicePipe) {
243	CloseFunc(p.rfd)
244	CloseFunc(p.wfd)
245}
246