1// Copyright 2015 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5// Test broken pipes on Unix systems.
6//
7//go:build !plan9 && !js && !wasip1
8
9package os_test
10
11import (
12	"bufio"
13	"bytes"
14	"fmt"
15	"internal/testenv"
16	"io"
17	"io/fs"
18	"os"
19	"os/exec"
20	"os/signal"
21	"runtime"
22	"strconv"
23	"strings"
24	"sync"
25	"syscall"
26	"testing"
27	"time"
28)
29
30func TestEPIPE(t *testing.T) {
31	// This test cannot be run in parallel because of a race similar
32	// to the one reported in https://go.dev/issue/22315.
33	//
34	// Even though the pipe is opened with O_CLOEXEC, if another test forks in
35	// between the call to os.Pipe and the call to r.Close, that child process can
36	// retain an open copy of r's file descriptor until it execs. If one of our
37	// Write calls occurs during that interval it can spuriously succeed,
38	// buffering the write to the child's copy of the pipe (even though the child
39	// will not actually read the buffered bytes).
40
41	r, w, err := os.Pipe()
42	if err != nil {
43		t.Fatal(err)
44	}
45	if err := r.Close(); err != nil {
46		t.Fatal(err)
47	}
48
49	expect := syscall.EPIPE
50	if runtime.GOOS == "windows" {
51		// 232 is Windows error code ERROR_NO_DATA, "The pipe is being closed".
52		expect = syscall.Errno(232)
53	}
54	// Every time we write to the pipe we should get an EPIPE.
55	for i := 0; i < 20; i++ {
56		_, err = w.Write([]byte("hi"))
57		if err == nil {
58			t.Fatal("unexpected success of Write to broken pipe")
59		}
60		if pe, ok := err.(*fs.PathError); ok {
61			err = pe.Err
62		}
63		if se, ok := err.(*os.SyscallError); ok {
64			err = se.Err
65		}
66		if err != expect {
67			t.Errorf("iteration %d: got %v, expected %v", i, err, expect)
68		}
69	}
70}
71
72func TestStdPipe(t *testing.T) {
73	switch runtime.GOOS {
74	case "windows":
75		t.Skip("Windows doesn't support SIGPIPE")
76	}
77
78	if os.Getenv("GO_TEST_STD_PIPE_HELPER") != "" {
79		if os.Getenv("GO_TEST_STD_PIPE_HELPER_SIGNAL") != "" {
80			signal.Notify(make(chan os.Signal, 1), syscall.SIGPIPE)
81		}
82		switch os.Getenv("GO_TEST_STD_PIPE_HELPER") {
83		case "1":
84			os.Stdout.Write([]byte("stdout"))
85		case "2":
86			os.Stderr.Write([]byte("stderr"))
87		case "3":
88			if _, err := os.NewFile(3, "3").Write([]byte("3")); err == nil {
89				os.Exit(3)
90			}
91		default:
92			panic("unrecognized value for GO_TEST_STD_PIPE_HELPER")
93		}
94		// For stdout/stderr, we should have crashed with a broken pipe error.
95		// The caller will be looking for that exit status,
96		// so just exit normally here to cause a failure in the caller.
97		// For descriptor 3, a normal exit is expected.
98		os.Exit(0)
99	}
100
101	testenv.MustHaveExec(t)
102	// This test cannot be run in parallel due to the same race as for TestEPIPE.
103	// (We expect a write to a closed pipe can fail, but a concurrent fork of a
104	// child process can cause the pipe to unexpectedly remain open.)
105
106	r, w, err := os.Pipe()
107	if err != nil {
108		t.Fatal(err)
109	}
110	if err := r.Close(); err != nil {
111		t.Fatal(err)
112	}
113	// Invoke the test program to run the test and write to a closed pipe.
114	// If sig is false:
115	// writing to stdout or stderr should cause an immediate SIGPIPE;
116	// writing to descriptor 3 should fail with EPIPE and then exit 0.
117	// If sig is true:
118	// all writes should fail with EPIPE and then exit 0.
119	for _, sig := range []bool{false, true} {
120		for dest := 1; dest < 4; dest++ {
121			cmd := testenv.Command(t, os.Args[0], "-test.run", "TestStdPipe")
122			cmd.Stdout = w
123			cmd.Stderr = w
124			cmd.ExtraFiles = []*os.File{w}
125			cmd.Env = append(os.Environ(), fmt.Sprintf("GO_TEST_STD_PIPE_HELPER=%d", dest))
126			if sig {
127				cmd.Env = append(cmd.Env, "GO_TEST_STD_PIPE_HELPER_SIGNAL=1")
128			}
129			if err := cmd.Run(); err == nil {
130				if !sig && dest < 3 {
131					t.Errorf("unexpected success of write to closed pipe %d sig %t in child", dest, sig)
132				}
133			} else if ee, ok := err.(*exec.ExitError); !ok {
134				t.Errorf("unexpected exec error type %T: %v", err, err)
135			} else if ws, ok := ee.Sys().(syscall.WaitStatus); !ok {
136				t.Errorf("unexpected wait status type %T: %v", ee.Sys(), ee.Sys())
137			} else if ws.Signaled() && ws.Signal() == syscall.SIGPIPE {
138				if sig || dest > 2 {
139					t.Errorf("unexpected SIGPIPE signal for descriptor %d sig %t", dest, sig)
140				}
141			} else {
142				t.Errorf("unexpected exit status %v for descriptor %d sig %t", err, dest, sig)
143			}
144		}
145	}
146
147	// Test redirecting stdout but not stderr.  Issue 40076.
148	cmd := testenv.Command(t, os.Args[0], "-test.run", "TestStdPipe")
149	cmd.Stdout = w
150	var stderr bytes.Buffer
151	cmd.Stderr = &stderr
152	cmd.Env = append(cmd.Environ(), "GO_TEST_STD_PIPE_HELPER=1")
153	if err := cmd.Run(); err == nil {
154		t.Errorf("unexpected success of write to closed stdout")
155	} else if ee, ok := err.(*exec.ExitError); !ok {
156		t.Errorf("unexpected exec error type %T: %v", err, err)
157	} else if ws, ok := ee.Sys().(syscall.WaitStatus); !ok {
158		t.Errorf("unexpected wait status type %T: %v", ee.Sys(), ee.Sys())
159	} else if !ws.Signaled() || ws.Signal() != syscall.SIGPIPE {
160		t.Errorf("unexpected exit status %v for write to closed stdout", err)
161	}
162	if output := stderr.Bytes(); len(output) > 0 {
163		t.Errorf("unexpected output on stderr: %s", output)
164	}
165}
166
167func testClosedPipeRace(t *testing.T, read bool) {
168	// This test cannot be run in parallel due to the same race as for TestEPIPE.
169	// (We expect a write to a closed pipe can fail, but a concurrent fork of a
170	// child process can cause the pipe to unexpectedly remain open.)
171
172	limit := 1
173	if !read {
174		// Get the amount we have to write to overload a pipe
175		// with no reader.
176		limit = 131073
177		if b, err := os.ReadFile("/proc/sys/fs/pipe-max-size"); err == nil {
178			if i, err := strconv.Atoi(strings.TrimSpace(string(b))); err == nil {
179				limit = i + 1
180			}
181		}
182		t.Logf("using pipe write limit of %d", limit)
183	}
184
185	r, w, err := os.Pipe()
186	if err != nil {
187		t.Fatal(err)
188	}
189	defer r.Close()
190	defer w.Close()
191
192	// Close the read end of the pipe in a goroutine while we are
193	// writing to the write end, or vice-versa.
194	go func() {
195		// Give the main goroutine a chance to enter the Read or
196		// Write call. This is sloppy but the test will pass even
197		// if we close before the read/write.
198		time.Sleep(20 * time.Millisecond)
199
200		var err error
201		if read {
202			err = r.Close()
203		} else {
204			err = w.Close()
205		}
206		if err != nil {
207			t.Error(err)
208		}
209	}()
210
211	b := make([]byte, limit)
212	if read {
213		_, err = r.Read(b[:])
214	} else {
215		_, err = w.Write(b[:])
216	}
217	if err == nil {
218		t.Error("I/O on closed pipe unexpectedly succeeded")
219	} else if pe, ok := err.(*fs.PathError); !ok {
220		t.Errorf("I/O on closed pipe returned unexpected error type %T; expected fs.PathError", pe)
221	} else if pe.Err != fs.ErrClosed {
222		t.Errorf("got error %q but expected %q", pe.Err, fs.ErrClosed)
223	} else {
224		t.Logf("I/O returned expected error %q", err)
225	}
226}
227
228func TestClosedPipeRaceRead(t *testing.T) {
229	testClosedPipeRace(t, true)
230}
231
232func TestClosedPipeRaceWrite(t *testing.T) {
233	testClosedPipeRace(t, false)
234}
235
236// Issue 20915: Reading on nonblocking fd should not return "waiting
237// for unsupported file type." Currently it returns EAGAIN; it is
238// possible that in the future it will simply wait for data.
239func TestReadNonblockingFd(t *testing.T) {
240	switch runtime.GOOS {
241	case "windows":
242		t.Skip("Windows doesn't support SetNonblock")
243	}
244	if os.Getenv("GO_WANT_READ_NONBLOCKING_FD") == "1" {
245		fd := syscallDescriptor(os.Stdin.Fd())
246		syscall.SetNonblock(fd, true)
247		defer syscall.SetNonblock(fd, false)
248		_, err := os.Stdin.Read(make([]byte, 1))
249		if err != nil {
250			if perr, ok := err.(*fs.PathError); !ok || perr.Err != syscall.EAGAIN {
251				t.Fatalf("read on nonblocking stdin got %q, should have gotten EAGAIN", err)
252			}
253		}
254		os.Exit(0)
255	}
256
257	testenv.MustHaveExec(t)
258	t.Parallel()
259
260	r, w, err := os.Pipe()
261	if err != nil {
262		t.Fatal(err)
263	}
264	defer r.Close()
265	defer w.Close()
266	cmd := testenv.Command(t, os.Args[0], "-test.run=^"+t.Name()+"$")
267	cmd.Env = append(cmd.Environ(), "GO_WANT_READ_NONBLOCKING_FD=1")
268	cmd.Stdin = r
269	output, err := cmd.CombinedOutput()
270	t.Logf("%s", output)
271	if err != nil {
272		t.Errorf("child process failed: %v", err)
273	}
274}
275
276func TestCloseWithBlockingReadByNewFile(t *testing.T) {
277	t.Parallel()
278
279	var p [2]syscallDescriptor
280	err := syscall.Pipe(p[:])
281	if err != nil {
282		t.Fatal(err)
283	}
284	// os.NewFile returns a blocking mode file.
285	testCloseWithBlockingRead(t, os.NewFile(uintptr(p[0]), "reader"), os.NewFile(uintptr(p[1]), "writer"))
286}
287
288func TestCloseWithBlockingReadByFd(t *testing.T) {
289	t.Parallel()
290
291	r, w, err := os.Pipe()
292	if err != nil {
293		t.Fatal(err)
294	}
295	// Calling Fd will put the file into blocking mode.
296	_ = r.Fd()
297	testCloseWithBlockingRead(t, r, w)
298}
299
300// Test that we don't let a blocking read prevent a close.
301func testCloseWithBlockingRead(t *testing.T, r, w *os.File) {
302	var (
303		enteringRead = make(chan struct{})
304		done         = make(chan struct{})
305	)
306	go func() {
307		var b [1]byte
308		close(enteringRead)
309		_, err := r.Read(b[:])
310		if err == nil {
311			t.Error("I/O on closed pipe unexpectedly succeeded")
312		}
313
314		if pe, ok := err.(*fs.PathError); ok {
315			err = pe.Err
316		}
317		if err != io.EOF && err != fs.ErrClosed {
318			t.Errorf("got %v, expected EOF or closed", err)
319		}
320		close(done)
321	}()
322
323	// Give the goroutine a chance to enter the Read
324	// or Write call. This is sloppy but the test will
325	// pass even if we close before the read/write.
326	<-enteringRead
327	time.Sleep(20 * time.Millisecond)
328
329	if err := r.Close(); err != nil {
330		t.Error(err)
331	}
332	// r.Close has completed, but since we assume r is in blocking mode that
333	// probably didn't unblock the call to r.Read. Close w to unblock it.
334	w.Close()
335	<-done
336}
337
338func TestPipeEOF(t *testing.T) {
339	t.Parallel()
340
341	r, w, err := os.Pipe()
342	if err != nil {
343		t.Fatal(err)
344	}
345
346	testPipeEOF(t, r, w)
347}
348
349// testPipeEOF tests that when the write side of a pipe or FIFO is closed,
350// a blocked Read call on the reader side returns io.EOF.
351//
352// This scenario previously failed to unblock the Read call on darwin.
353// (See https://go.dev/issue/24164.)
354func testPipeEOF(t *testing.T, r io.ReadCloser, w io.WriteCloser) {
355	// parkDelay is an arbitrary delay we wait for a pipe-reader goroutine to park
356	// before issuing the corresponding write. The test should pass no matter what
357	// delay we use, but with a longer delay is has a higher chance of detecting
358	// poller bugs.
359	parkDelay := 10 * time.Millisecond
360	if testing.Short() {
361		parkDelay = 100 * time.Microsecond
362	}
363	writerDone := make(chan struct{})
364	defer func() {
365		if err := r.Close(); err != nil {
366			t.Errorf("error closing reader: %v", err)
367		}
368		<-writerDone
369	}()
370
371	write := make(chan int, 1)
372	go func() {
373		defer close(writerDone)
374
375		for i := range write {
376			time.Sleep(parkDelay)
377			_, err := fmt.Fprintf(w, "line %d\n", i)
378			if err != nil {
379				t.Errorf("error writing to fifo: %v", err)
380				return
381			}
382		}
383
384		time.Sleep(parkDelay)
385		if err := w.Close(); err != nil {
386			t.Errorf("error closing writer: %v", err)
387		}
388	}()
389
390	rbuf := bufio.NewReader(r)
391	for i := 0; i < 3; i++ {
392		write <- i
393		b, err := rbuf.ReadBytes('\n')
394		if err != nil {
395			t.Fatal(err)
396		}
397		t.Logf("%s\n", bytes.TrimSpace(b))
398	}
399
400	close(write)
401	b, err := rbuf.ReadBytes('\n')
402	if err != io.EOF || len(b) != 0 {
403		t.Errorf(`ReadBytes: %q, %v; want "", io.EOF`, b, err)
404	}
405}
406
407// Issue 24481.
408func TestFdRace(t *testing.T) {
409	// This test starts 100 simultaneous goroutines, which could bury a more
410	// interesting stack if this or some other test happens to panic. It is also
411	// nearly instantaneous, so any latency benefit from running it in parallel
412	// would be minimal.
413
414	r, w, err := os.Pipe()
415	if err != nil {
416		t.Fatal(err)
417	}
418	defer r.Close()
419	defer w.Close()
420
421	var wg sync.WaitGroup
422	call := func() {
423		defer wg.Done()
424		w.Fd()
425	}
426
427	const tries = 100
428	for i := 0; i < tries; i++ {
429		wg.Add(1)
430		go call()
431	}
432	wg.Wait()
433}
434
435func TestFdReadRace(t *testing.T) {
436	t.Parallel()
437
438	r, w, err := os.Pipe()
439	if err != nil {
440		t.Fatal(err)
441	}
442	defer r.Close()
443	defer w.Close()
444
445	const count = 10
446
447	c := make(chan bool, 1)
448	var wg sync.WaitGroup
449	wg.Add(1)
450	go func() {
451		defer wg.Done()
452		var buf [count]byte
453		r.SetReadDeadline(time.Now().Add(time.Minute))
454		c <- true
455		if _, err := r.Read(buf[:]); os.IsTimeout(err) {
456			t.Error("read timed out")
457		}
458	}()
459
460	wg.Add(1)
461	go func() {
462		defer wg.Done()
463		<-c
464		// Give the other goroutine a chance to enter the Read.
465		// It doesn't matter if this occasionally fails, the test
466		// will still pass, it just won't test anything.
467		time.Sleep(10 * time.Millisecond)
468		r.Fd()
469
470		// The bug was that Fd would hang until Read timed out.
471		// If the bug is fixed, then writing to w and closing r here
472		// will cause the Read to exit before the timeout expires.
473		w.Write(make([]byte, count))
474		r.Close()
475	}()
476
477	wg.Wait()
478}
479