Black Lives Matter. Support the Equal Justice Initiative.

Source file src/os/pipe_test.go

Documentation: os

     1  // Copyright 2015 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  // Test broken pipes on Unix systems.
     6  //go:build !plan9 && !js
     7  // +build !plan9,!js
     8  
     9  package os_test
    10  
    11  import (
    12  	"bufio"
    13  	"bytes"
    14  	"fmt"
    15  	"internal/testenv"
    16  	"io"
    17  	"io/fs"
    18  	"os"
    19  	osexec "os/exec"
    20  	"os/signal"
    21  	"runtime"
    22  	"strconv"
    23  	"strings"
    24  	"sync"
    25  	"syscall"
    26  	"testing"
    27  	"time"
    28  )
    29  
    30  func TestEPIPE(t *testing.T) {
    31  	r, w, err := os.Pipe()
    32  	if err != nil {
    33  		t.Fatal(err)
    34  	}
    35  	if err := r.Close(); err != nil {
    36  		t.Fatal(err)
    37  	}
    38  
    39  	expect := syscall.EPIPE
    40  	if runtime.GOOS == "windows" {
    41  		// 232 is Windows error code ERROR_NO_DATA, "The pipe is being closed".
    42  		expect = syscall.Errno(232)
    43  	}
    44  	// Every time we write to the pipe we should get an EPIPE.
    45  	for i := 0; i < 20; i++ {
    46  		_, err = w.Write([]byte("hi"))
    47  		if err == nil {
    48  			t.Fatal("unexpected success of Write to broken pipe")
    49  		}
    50  		if pe, ok := err.(*fs.PathError); ok {
    51  			err = pe.Err
    52  		}
    53  		if se, ok := err.(*os.SyscallError); ok {
    54  			err = se.Err
    55  		}
    56  		if err != expect {
    57  			t.Errorf("iteration %d: got %v, expected %v", i, err, expect)
    58  		}
    59  	}
    60  }
    61  
    62  func TestStdPipe(t *testing.T) {
    63  	switch runtime.GOOS {
    64  	case "windows":
    65  		t.Skip("Windows doesn't support SIGPIPE")
    66  	}
    67  	testenv.MustHaveExec(t)
    68  	r, w, err := os.Pipe()
    69  	if err != nil {
    70  		t.Fatal(err)
    71  	}
    72  	if err := r.Close(); err != nil {
    73  		t.Fatal(err)
    74  	}
    75  	// Invoke the test program to run the test and write to a closed pipe.
    76  	// If sig is false:
    77  	// writing to stdout or stderr should cause an immediate SIGPIPE;
    78  	// writing to descriptor 3 should fail with EPIPE and then exit 0.
    79  	// If sig is true:
    80  	// all writes should fail with EPIPE and then exit 0.
    81  	for _, sig := range []bool{false, true} {
    82  		for dest := 1; dest < 4; dest++ {
    83  			cmd := osexec.Command(os.Args[0], "-test.run", "TestStdPipeHelper")
    84  			cmd.Stdout = w
    85  			cmd.Stderr = w
    86  			cmd.ExtraFiles = []*os.File{w}
    87  			cmd.Env = append(os.Environ(), fmt.Sprintf("GO_TEST_STD_PIPE_HELPER=%d", dest))
    88  			if sig {
    89  				cmd.Env = append(cmd.Env, "GO_TEST_STD_PIPE_HELPER_SIGNAL=1")
    90  			}
    91  			if err := cmd.Run(); err == nil {
    92  				if !sig && dest < 3 {
    93  					t.Errorf("unexpected success of write to closed pipe %d sig %t in child", dest, sig)
    94  				}
    95  			} else if ee, ok := err.(*osexec.ExitError); !ok {
    96  				t.Errorf("unexpected exec error type %T: %v", err, err)
    97  			} else if ws, ok := ee.Sys().(syscall.WaitStatus); !ok {
    98  				t.Errorf("unexpected wait status type %T: %v", ee.Sys(), ee.Sys())
    99  			} else if ws.Signaled() && ws.Signal() == syscall.SIGPIPE {
   100  				if sig || dest > 2 {
   101  					t.Errorf("unexpected SIGPIPE signal for descriptor %d sig %t", dest, sig)
   102  				}
   103  			} else {
   104  				t.Errorf("unexpected exit status %v for descriptor %d sig %t", err, dest, sig)
   105  			}
   106  		}
   107  	}
   108  
   109  	// Test redirecting stdout but not stderr.  Issue 40076.
   110  	cmd := osexec.Command(os.Args[0], "-test.run", "TestStdPipeHelper")
   111  	cmd.Stdout = w
   112  	var stderr bytes.Buffer
   113  	cmd.Stderr = &stderr
   114  	cmd.Env = append(os.Environ(), "GO_TEST_STD_PIPE_HELPER=1")
   115  	if err := cmd.Run(); err == nil {
   116  		t.Errorf("unexpected success of write to closed stdout")
   117  	} else if ee, ok := err.(*osexec.ExitError); !ok {
   118  		t.Errorf("unexpected exec error type %T: %v", err, err)
   119  	} else if ws, ok := ee.Sys().(syscall.WaitStatus); !ok {
   120  		t.Errorf("unexpected wait status type %T: %v", ee.Sys(), ee.Sys())
   121  	} else if !ws.Signaled() || ws.Signal() != syscall.SIGPIPE {
   122  		t.Errorf("unexpected exit status %v for write to closed stdout", err)
   123  	}
   124  	if output := stderr.Bytes(); len(output) > 0 {
   125  		t.Errorf("unexpected output on stderr: %s", output)
   126  	}
   127  }
   128  
   129  // This is a helper for TestStdPipe. It's not a test in itself.
   130  func TestStdPipeHelper(t *testing.T) {
   131  	if os.Getenv("GO_TEST_STD_PIPE_HELPER_SIGNAL") != "" {
   132  		signal.Notify(make(chan os.Signal, 1), syscall.SIGPIPE)
   133  	}
   134  	switch os.Getenv("GO_TEST_STD_PIPE_HELPER") {
   135  	case "1":
   136  		os.Stdout.Write([]byte("stdout"))
   137  	case "2":
   138  		os.Stderr.Write([]byte("stderr"))
   139  	case "3":
   140  		if _, err := os.NewFile(3, "3").Write([]byte("3")); err == nil {
   141  			os.Exit(3)
   142  		}
   143  	default:
   144  		t.Skip("skipping test helper")
   145  	}
   146  	// For stdout/stderr, we should have crashed with a broken pipe error.
   147  	// The caller will be looking for that exit status,
   148  	// so just exit normally here to cause a failure in the caller.
   149  	// For descriptor 3, a normal exit is expected.
   150  	os.Exit(0)
   151  }
   152  
   153  func testClosedPipeRace(t *testing.T, read bool) {
   154  	switch runtime.GOOS {
   155  	case "freebsd":
   156  		t.Skip("FreeBSD does not use the poller; issue 19093")
   157  	}
   158  
   159  	limit := 1
   160  	if !read {
   161  		// Get the amount we have to write to overload a pipe
   162  		// with no reader.
   163  		limit = 131073
   164  		if b, err := os.ReadFile("/proc/sys/fs/pipe-max-size"); err == nil {
   165  			if i, err := strconv.Atoi(strings.TrimSpace(string(b))); err == nil {
   166  				limit = i + 1
   167  			}
   168  		}
   169  		t.Logf("using pipe write limit of %d", limit)
   170  	}
   171  
   172  	r, w, err := os.Pipe()
   173  	if err != nil {
   174  		t.Fatal(err)
   175  	}
   176  	defer r.Close()
   177  	defer w.Close()
   178  
   179  	// Close the read end of the pipe in a goroutine while we are
   180  	// writing to the write end, or vice-versa.
   181  	go func() {
   182  		// Give the main goroutine a chance to enter the Read or
   183  		// Write call. This is sloppy but the test will pass even
   184  		// if we close before the read/write.
   185  		time.Sleep(20 * time.Millisecond)
   186  
   187  		var err error
   188  		if read {
   189  			err = r.Close()
   190  		} else {
   191  			err = w.Close()
   192  		}
   193  		if err != nil {
   194  			t.Error(err)
   195  		}
   196  	}()
   197  
   198  	b := make([]byte, limit)
   199  	if read {
   200  		_, err = r.Read(b[:])
   201  	} else {
   202  		_, err = w.Write(b[:])
   203  	}
   204  	if err == nil {
   205  		t.Error("I/O on closed pipe unexpectedly succeeded")
   206  	} else if pe, ok := err.(*fs.PathError); !ok {
   207  		t.Errorf("I/O on closed pipe returned unexpected error type %T; expected fs.PathError", pe)
   208  	} else if pe.Err != fs.ErrClosed {
   209  		t.Errorf("got error %q but expected %q", pe.Err, fs.ErrClosed)
   210  	} else {
   211  		t.Logf("I/O returned expected error %q", err)
   212  	}
   213  }
   214  
   215  func TestClosedPipeRaceRead(t *testing.T) {
   216  	testClosedPipeRace(t, true)
   217  }
   218  
   219  func TestClosedPipeRaceWrite(t *testing.T) {
   220  	testClosedPipeRace(t, false)
   221  }
   222  
   223  // Issue 20915: Reading on nonblocking fd should not return "waiting
   224  // for unsupported file type." Currently it returns EAGAIN; it is
   225  // possible that in the future it will simply wait for data.
   226  func TestReadNonblockingFd(t *testing.T) {
   227  	switch runtime.GOOS {
   228  	case "windows":
   229  		t.Skip("Windows doesn't support SetNonblock")
   230  	}
   231  	if os.Getenv("GO_WANT_READ_NONBLOCKING_FD") == "1" {
   232  		fd := syscallDescriptor(os.Stdin.Fd())
   233  		syscall.SetNonblock(fd, true)
   234  		defer syscall.SetNonblock(fd, false)
   235  		_, err := os.Stdin.Read(make([]byte, 1))
   236  		if err != nil {
   237  			if perr, ok := err.(*fs.PathError); !ok || perr.Err != syscall.EAGAIN {
   238  				t.Fatalf("read on nonblocking stdin got %q, should have gotten EAGAIN", err)
   239  			}
   240  		}
   241  		os.Exit(0)
   242  	}
   243  
   244  	testenv.MustHaveExec(t)
   245  	r, w, err := os.Pipe()
   246  	if err != nil {
   247  		t.Fatal(err)
   248  	}
   249  	defer r.Close()
   250  	defer w.Close()
   251  	cmd := osexec.Command(os.Args[0], "-test.run="+t.Name())
   252  	cmd.Env = append(os.Environ(), "GO_WANT_READ_NONBLOCKING_FD=1")
   253  	cmd.Stdin = r
   254  	output, err := cmd.CombinedOutput()
   255  	t.Logf("%s", output)
   256  	if err != nil {
   257  		t.Errorf("child process failed: %v", err)
   258  	}
   259  }
   260  
   261  func TestCloseWithBlockingReadByNewFile(t *testing.T) {
   262  	var p [2]syscallDescriptor
   263  	err := syscall.Pipe(p[:])
   264  	if err != nil {
   265  		t.Fatal(err)
   266  	}
   267  	// os.NewFile returns a blocking mode file.
   268  	testCloseWithBlockingRead(t, os.NewFile(uintptr(p[0]), "reader"), os.NewFile(uintptr(p[1]), "writer"))
   269  }
   270  
   271  func TestCloseWithBlockingReadByFd(t *testing.T) {
   272  	r, w, err := os.Pipe()
   273  	if err != nil {
   274  		t.Fatal(err)
   275  	}
   276  	// Calling Fd will put the file into blocking mode.
   277  	_ = r.Fd()
   278  	testCloseWithBlockingRead(t, r, w)
   279  }
   280  
   281  // Test that we don't let a blocking read prevent a close.
   282  func testCloseWithBlockingRead(t *testing.T, r, w *os.File) {
   283  	defer r.Close()
   284  	defer w.Close()
   285  
   286  	c1, c2 := make(chan bool), make(chan bool)
   287  	var wg sync.WaitGroup
   288  
   289  	wg.Add(1)
   290  	go func(c chan bool) {
   291  		defer wg.Done()
   292  		// Give the other goroutine a chance to enter the Read
   293  		// or Write call. This is sloppy but the test will
   294  		// pass even if we close before the read/write.
   295  		time.Sleep(20 * time.Millisecond)
   296  
   297  		if err := r.Close(); err != nil {
   298  			t.Error(err)
   299  		}
   300  		close(c)
   301  	}(c1)
   302  
   303  	wg.Add(1)
   304  	go func(c chan bool) {
   305  		defer wg.Done()
   306  		var b [1]byte
   307  		_, err := r.Read(b[:])
   308  		close(c)
   309  		if err == nil {
   310  			t.Error("I/O on closed pipe unexpectedly succeeded")
   311  		}
   312  		if pe, ok := err.(*fs.PathError); ok {
   313  			err = pe.Err
   314  		}
   315  		if err != io.EOF && err != fs.ErrClosed {
   316  			t.Errorf("got %v, expected EOF or closed", err)
   317  		}
   318  	}(c2)
   319  
   320  	for c1 != nil || c2 != nil {
   321  		select {
   322  		case <-c1:
   323  			c1 = nil
   324  			// r.Close has completed, but the blocking Read
   325  			// is hanging. Close the writer to unblock it.
   326  			w.Close()
   327  		case <-c2:
   328  			c2 = nil
   329  		case <-time.After(1 * time.Second):
   330  			switch {
   331  			case c1 != nil && c2 != nil:
   332  				t.Error("timed out waiting for Read and Close")
   333  				w.Close()
   334  			case c1 != nil:
   335  				t.Error("timed out waiting for Close")
   336  			case c2 != nil:
   337  				t.Error("timed out waiting for Read")
   338  			default:
   339  				t.Error("impossible case")
   340  			}
   341  		}
   342  	}
   343  
   344  	wg.Wait()
   345  }
   346  
   347  // Issue 24164, for pipes.
   348  func TestPipeEOF(t *testing.T) {
   349  	r, w, err := os.Pipe()
   350  	if err != nil {
   351  		t.Fatal(err)
   352  	}
   353  
   354  	var wg sync.WaitGroup
   355  	wg.Add(1)
   356  	go func() {
   357  		defer wg.Done()
   358  
   359  		defer func() {
   360  			if err := w.Close(); err != nil {
   361  				t.Errorf("error closing writer: %v", err)
   362  			}
   363  		}()
   364  
   365  		for i := 0; i < 3; i++ {
   366  			time.Sleep(10 * time.Millisecond)
   367  			_, err := fmt.Fprintf(w, "line %d\n", i)
   368  			if err != nil {
   369  				t.Errorf("error writing to fifo: %v", err)
   370  				return
   371  			}
   372  		}
   373  		time.Sleep(10 * time.Millisecond)
   374  	}()
   375  
   376  	defer wg.Wait()
   377  
   378  	done := make(chan bool)
   379  	go func() {
   380  		defer close(done)
   381  
   382  		defer func() {
   383  			if err := r.Close(); err != nil {
   384  				t.Errorf("error closing reader: %v", err)
   385  			}
   386  		}()
   387  
   388  		rbuf := bufio.NewReader(r)
   389  		for {
   390  			b, err := rbuf.ReadBytes('\n')
   391  			if err == io.EOF {
   392  				break
   393  			}
   394  			if err != nil {
   395  				t.Error(err)
   396  				return
   397  			}
   398  			t.Logf("%s\n", bytes.TrimSpace(b))
   399  		}
   400  	}()
   401  
   402  	select {
   403  	case <-done:
   404  		// Test succeeded.
   405  	case <-time.After(time.Second):
   406  		t.Error("timed out waiting for read")
   407  		// Close the reader to force the read to complete.
   408  		r.Close()
   409  	}
   410  }
   411  
   412  // Issue 24481.
   413  func TestFdRace(t *testing.T) {
   414  	r, w, err := os.Pipe()
   415  	if err != nil {
   416  		t.Fatal(err)
   417  	}
   418  	defer r.Close()
   419  	defer w.Close()
   420  
   421  	var wg sync.WaitGroup
   422  	call := func() {
   423  		defer wg.Done()
   424  		w.Fd()
   425  	}
   426  
   427  	const tries = 100
   428  	for i := 0; i < tries; i++ {
   429  		wg.Add(1)
   430  		go call()
   431  	}
   432  	wg.Wait()
   433  }
   434  
   435  func TestFdReadRace(t *testing.T) {
   436  	t.Parallel()
   437  
   438  	r, w, err := os.Pipe()
   439  	if err != nil {
   440  		t.Fatal(err)
   441  	}
   442  	defer r.Close()
   443  	defer w.Close()
   444  
   445  	const count = 10
   446  
   447  	c := make(chan bool, 1)
   448  	var wg sync.WaitGroup
   449  	wg.Add(1)
   450  	go func() {
   451  		defer wg.Done()
   452  		var buf [count]byte
   453  		r.SetReadDeadline(time.Now().Add(time.Minute))
   454  		c <- true
   455  		if _, err := r.Read(buf[:]); os.IsTimeout(err) {
   456  			t.Error("read timed out")
   457  		}
   458  	}()
   459  
   460  	wg.Add(1)
   461  	go func() {
   462  		defer wg.Done()
   463  		<-c
   464  		// Give the other goroutine a chance to enter the Read.
   465  		// It doesn't matter if this occasionally fails, the test
   466  		// will still pass, it just won't test anything.
   467  		time.Sleep(10 * time.Millisecond)
   468  		r.Fd()
   469  
   470  		// The bug was that Fd would hang until Read timed out.
   471  		// If the bug is fixed, then writing to w and closing r here
   472  		// will cause the Read to exit before the timeout expires.
   473  		w.Write(make([]byte, count))
   474  		r.Close()
   475  	}()
   476  
   477  	wg.Wait()
   478  }
   479  

View as plain text