1// Copyright 2010 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package net 6 7import ( 8 "io" 9 "os" 10 "sync" 11 "time" 12) 13 14// pipeDeadline is an abstraction for handling timeouts. 15type pipeDeadline struct { 16 mu sync.Mutex // Guards timer and cancel 17 timer *time.Timer 18 cancel chan struct{} // Must be non-nil 19} 20 21func makePipeDeadline() pipeDeadline { 22 return pipeDeadline{cancel: make(chan struct{})} 23} 24 25// set sets the point in time when the deadline will time out. 26// A timeout event is signaled by closing the channel returned by waiter. 27// Once a timeout has occurred, the deadline can be refreshed by specifying a 28// t value in the future. 29// 30// A zero value for t prevents timeout. 31func (d *pipeDeadline) set(t time.Time) { 32 d.mu.Lock() 33 defer d.mu.Unlock() 34 35 if d.timer != nil && !d.timer.Stop() { 36 <-d.cancel // Wait for the timer callback to finish and close cancel 37 } 38 d.timer = nil 39 40 // Time is zero, then there is no deadline. 41 closed := isClosedChan(d.cancel) 42 if t.IsZero() { 43 if closed { 44 d.cancel = make(chan struct{}) 45 } 46 return 47 } 48 49 // Time in the future, setup a timer to cancel in the future. 50 if dur := time.Until(t); dur > 0 { 51 if closed { 52 d.cancel = make(chan struct{}) 53 } 54 d.timer = time.AfterFunc(dur, func() { 55 close(d.cancel) 56 }) 57 return 58 } 59 60 // Time in the past, so close immediately. 61 if !closed { 62 close(d.cancel) 63 } 64} 65 66// wait returns a channel that is closed when the deadline is exceeded. 67func (d *pipeDeadline) wait() chan struct{} { 68 d.mu.Lock() 69 defer d.mu.Unlock() 70 return d.cancel 71} 72 73func isClosedChan(c <-chan struct{}) bool { 74 select { 75 case <-c: 76 return true 77 default: 78 return false 79 } 80} 81 82type pipeAddr struct{} 83 84func (pipeAddr) Network() string { return "pipe" } 85func (pipeAddr) String() string { return "pipe" } 86 87type pipe struct { 88 wrMu sync.Mutex // Serialize Write operations 89 90 // Used by local Read to interact with remote Write. 91 // Successful receive on rdRx is always followed by send on rdTx. 92 rdRx <-chan []byte 93 rdTx chan<- int 94 95 // Used by local Write to interact with remote Read. 96 // Successful send on wrTx is always followed by receive on wrRx. 97 wrTx chan<- []byte 98 wrRx <-chan int 99 100 once sync.Once // Protects closing localDone 101 localDone chan struct{} 102 remoteDone <-chan struct{} 103 104 readDeadline pipeDeadline 105 writeDeadline pipeDeadline 106} 107 108// Pipe creates a synchronous, in-memory, full duplex 109// network connection; both ends implement the [Conn] interface. 110// Reads on one end are matched with writes on the other, 111// copying data directly between the two; there is no internal 112// buffering. 113func Pipe() (Conn, Conn) { 114 cb1 := make(chan []byte) 115 cb2 := make(chan []byte) 116 cn1 := make(chan int) 117 cn2 := make(chan int) 118 done1 := make(chan struct{}) 119 done2 := make(chan struct{}) 120 121 p1 := &pipe{ 122 rdRx: cb1, rdTx: cn1, 123 wrTx: cb2, wrRx: cn2, 124 localDone: done1, remoteDone: done2, 125 readDeadline: makePipeDeadline(), 126 writeDeadline: makePipeDeadline(), 127 } 128 p2 := &pipe{ 129 rdRx: cb2, rdTx: cn2, 130 wrTx: cb1, wrRx: cn1, 131 localDone: done2, remoteDone: done1, 132 readDeadline: makePipeDeadline(), 133 writeDeadline: makePipeDeadline(), 134 } 135 return p1, p2 136} 137 138func (*pipe) LocalAddr() Addr { return pipeAddr{} } 139func (*pipe) RemoteAddr() Addr { return pipeAddr{} } 140 141func (p *pipe) Read(b []byte) (int, error) { 142 n, err := p.read(b) 143 if err != nil && err != io.EOF && err != io.ErrClosedPipe { 144 err = &OpError{Op: "read", Net: "pipe", Err: err} 145 } 146 return n, err 147} 148 149func (p *pipe) read(b []byte) (n int, err error) { 150 switch { 151 case isClosedChan(p.localDone): 152 return 0, io.ErrClosedPipe 153 case isClosedChan(p.remoteDone): 154 return 0, io.EOF 155 case isClosedChan(p.readDeadline.wait()): 156 return 0, os.ErrDeadlineExceeded 157 } 158 159 select { 160 case bw := <-p.rdRx: 161 nr := copy(b, bw) 162 p.rdTx <- nr 163 return nr, nil 164 case <-p.localDone: 165 return 0, io.ErrClosedPipe 166 case <-p.remoteDone: 167 return 0, io.EOF 168 case <-p.readDeadline.wait(): 169 return 0, os.ErrDeadlineExceeded 170 } 171} 172 173func (p *pipe) Write(b []byte) (int, error) { 174 n, err := p.write(b) 175 if err != nil && err != io.ErrClosedPipe { 176 err = &OpError{Op: "write", Net: "pipe", Err: err} 177 } 178 return n, err 179} 180 181func (p *pipe) write(b []byte) (n int, err error) { 182 switch { 183 case isClosedChan(p.localDone): 184 return 0, io.ErrClosedPipe 185 case isClosedChan(p.remoteDone): 186 return 0, io.ErrClosedPipe 187 case isClosedChan(p.writeDeadline.wait()): 188 return 0, os.ErrDeadlineExceeded 189 } 190 191 p.wrMu.Lock() // Ensure entirety of b is written together 192 defer p.wrMu.Unlock() 193 for once := true; once || len(b) > 0; once = false { 194 select { 195 case p.wrTx <- b: 196 nw := <-p.wrRx 197 b = b[nw:] 198 n += nw 199 case <-p.localDone: 200 return n, io.ErrClosedPipe 201 case <-p.remoteDone: 202 return n, io.ErrClosedPipe 203 case <-p.writeDeadline.wait(): 204 return n, os.ErrDeadlineExceeded 205 } 206 } 207 return n, nil 208} 209 210func (p *pipe) SetDeadline(t time.Time) error { 211 if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) { 212 return io.ErrClosedPipe 213 } 214 p.readDeadline.set(t) 215 p.writeDeadline.set(t) 216 return nil 217} 218 219func (p *pipe) SetReadDeadline(t time.Time) error { 220 if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) { 221 return io.ErrClosedPipe 222 } 223 p.readDeadline.set(t) 224 return nil 225} 226 227func (p *pipe) SetWriteDeadline(t time.Time) error { 228 if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) { 229 return io.ErrClosedPipe 230 } 231 p.writeDeadline.set(t) 232 return nil 233} 234 235func (p *pipe) Close() error { 236 p.once.Do(func() { close(p.localDone) }) 237 return nil 238} 239