1// Copyright 2020 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package fuzz
6
7import (
8	"bytes"
9	"context"
10	"crypto/sha256"
11	"encoding/json"
12	"errors"
13	"fmt"
14	"io"
15	"os"
16	"os/exec"
17	"reflect"
18	"runtime"
19	"sync"
20	"time"
21)
22
23const (
24	// workerFuzzDuration is the amount of time a worker can spend testing random
25	// variations of an input given by the coordinator.
26	workerFuzzDuration = 100 * time.Millisecond
27
28	// workerTimeoutDuration is the amount of time a worker can go without
29	// responding to the coordinator before being stopped.
30	workerTimeoutDuration = 1 * time.Second
31
32	// workerExitCode is used as an exit code by fuzz worker processes after an internal error.
33	// This distinguishes internal errors from uncontrolled panics and other crashes.
34	// Keep in sync with internal/fuzz.workerExitCode.
35	workerExitCode = 70
36
37	// workerSharedMemSize is the maximum size of the shared memory file used to
38	// communicate with workers. This limits the size of fuzz inputs.
39	workerSharedMemSize = 100 << 20 // 100 MB
40)
41
42// worker manages a worker process running a test binary. The worker object
43// exists only in the coordinator (the process started by 'go test -fuzz').
44// workerClient is used by the coordinator to send RPCs to the worker process,
45// which handles them with workerServer.
46type worker struct {
47	dir     string   // working directory, same as package directory
48	binPath string   // path to test executable
49	args    []string // arguments for test executable
50	env     []string // environment for test executable
51
52	coordinator *coordinator
53
54	memMu chan *sharedMem // mutex guarding shared memory with worker; persists across processes.
55
56	cmd         *exec.Cmd     // current worker process
57	client      *workerClient // used to communicate with worker process
58	waitErr     error         // last error returned by wait, set before termC is closed.
59	interrupted bool          // true after stop interrupts a running worker.
60	termC       chan struct{} // closed by wait when worker process terminates
61}
62
63func newWorker(c *coordinator, dir, binPath string, args, env []string) (*worker, error) {
64	mem, err := sharedMemTempFile(workerSharedMemSize)
65	if err != nil {
66		return nil, err
67	}
68	memMu := make(chan *sharedMem, 1)
69	memMu <- mem
70	return &worker{
71		dir:         dir,
72		binPath:     binPath,
73		args:        args,
74		env:         env[:len(env):len(env)], // copy on append to ensure workers don't overwrite each other.
75		coordinator: c,
76		memMu:       memMu,
77	}, nil
78}
79
80// cleanup releases persistent resources associated with the worker.
81func (w *worker) cleanup() error {
82	mem := <-w.memMu
83	if mem == nil {
84		return nil
85	}
86	close(w.memMu)
87	return mem.Close()
88}
89
90// coordinate runs the test binary to perform fuzzing.
91//
92// coordinate loops until ctx is canceled or a fatal error is encountered.
93// If a test process terminates unexpectedly while fuzzing, coordinate will
94// attempt to restart and continue unless the termination can be attributed
95// to an interruption (from a timer or the user).
96//
97// While looping, coordinate receives inputs from the coordinator, passes
98// those inputs to the worker process, then passes the results back to
99// the coordinator.
100func (w *worker) coordinate(ctx context.Context) error {
101	// Main event loop.
102	for {
103		// Start or restart the worker if it's not running.
104		if !w.isRunning() {
105			if err := w.startAndPing(ctx); err != nil {
106				return err
107			}
108		}
109
110		select {
111		case <-ctx.Done():
112			// Worker was told to stop.
113			err := w.stop()
114			if err != nil && !w.interrupted && !isInterruptError(err) {
115				return err
116			}
117			return ctx.Err()
118
119		case <-w.termC:
120			// Worker process terminated unexpectedly while waiting for input.
121			err := w.stop()
122			if w.interrupted {
123				panic("worker interrupted after unexpected termination")
124			}
125			if err == nil || isInterruptError(err) {
126				// Worker stopped, either by exiting with status 0 or after being
127				// interrupted with a signal that was not sent by the coordinator.
128				//
129				// When the user presses ^C, on POSIX platforms, SIGINT is delivered to
130				// all processes in the group concurrently, and the worker may see it
131				// before the coordinator. The worker should exit 0 gracefully (in
132				// theory).
133				//
134				// This condition is probably intended by the user, so suppress
135				// the error.
136				return nil
137			}
138			if exitErr, ok := err.(*exec.ExitError); ok && exitErr.ExitCode() == workerExitCode {
139				// Worker exited with a code indicating F.Fuzz was not called correctly,
140				// for example, F.Fail was called first.
141				return fmt.Errorf("fuzzing process exited unexpectedly due to an internal failure: %w", err)
142			}
143			// Worker exited non-zero or was terminated by a non-interrupt
144			// signal (for example, SIGSEGV) while fuzzing.
145			return fmt.Errorf("fuzzing process hung or terminated unexpectedly: %w", err)
146			// TODO(jayconrod,katiehockman): if -keepfuzzing, restart worker.
147
148		case input := <-w.coordinator.inputC:
149			// Received input from coordinator.
150			args := fuzzArgs{
151				Limit:        input.limit,
152				Timeout:      input.timeout,
153				Warmup:       input.warmup,
154				CoverageData: input.coverageData,
155			}
156			entry, resp, isInternalError, err := w.client.fuzz(ctx, input.entry, args)
157			canMinimize := true
158			if err != nil {
159				// Error communicating with worker.
160				w.stop()
161				if ctx.Err() != nil {
162					// Timeout or interruption.
163					return ctx.Err()
164				}
165				if w.interrupted {
166					// Communication error before we stopped the worker.
167					// Report an error, but don't record a crasher.
168					return fmt.Errorf("communicating with fuzzing process: %v", err)
169				}
170				if sig, ok := terminationSignal(w.waitErr); ok && !isCrashSignal(sig) {
171					// Worker terminated by a signal that probably wasn't caused by a
172					// specific input to the fuzz function. For example, on Linux,
173					// the kernel (OOM killer) may send SIGKILL to a process using a lot
174					// of memory. Or the shell might send SIGHUP when the terminal
175					// is closed. Don't record a crasher.
176					return fmt.Errorf("fuzzing process terminated by unexpected signal; no crash will be recorded: %v", w.waitErr)
177				}
178				if isInternalError {
179					// An internal error occurred which shouldn't be considered
180					// a crash.
181					return err
182				}
183				// Unexpected termination. Set error message and fall through.
184				// We'll restart the worker on the next iteration.
185				// Don't attempt to minimize this since it crashed the worker.
186				resp.Err = fmt.Sprintf("fuzzing process hung or terminated unexpectedly: %v", w.waitErr)
187				canMinimize = false
188			}
189			result := fuzzResult{
190				limit:         input.limit,
191				count:         resp.Count,
192				totalDuration: resp.TotalDuration,
193				entryDuration: resp.InterestingDuration,
194				entry:         entry,
195				crasherMsg:    resp.Err,
196				coverageData:  resp.CoverageData,
197				canMinimize:   canMinimize,
198			}
199			w.coordinator.resultC <- result
200
201		case input := <-w.coordinator.minimizeC:
202			// Received input to minimize from coordinator.
203			result, err := w.minimize(ctx, input)
204			if err != nil {
205				// Error minimizing. Send back the original input. If it didn't cause
206				// an error before, report it as causing an error now.
207				// TODO: double-check this is handled correctly when
208				// implementing -keepfuzzing.
209				result = fuzzResult{
210					entry:       input.entry,
211					crasherMsg:  input.crasherMsg,
212					canMinimize: false,
213					limit:       input.limit,
214				}
215				if result.crasherMsg == "" {
216					result.crasherMsg = err.Error()
217				}
218			}
219			if shouldPrintDebugInfo() {
220				w.coordinator.debugLogf(
221					"input minimized, id: %s, original id: %s, crasher: %t, originally crasher: %t, minimizing took: %s",
222					result.entry.Path,
223					input.entry.Path,
224					result.crasherMsg != "",
225					input.crasherMsg != "",
226					result.totalDuration,
227				)
228			}
229			w.coordinator.resultC <- result
230		}
231	}
232}
233
234// minimize tells a worker process to attempt to find a smaller value that
235// either causes an error (if we started minimizing because we found an input
236// that causes an error) or preserves new coverage (if we started minimizing
237// because we found an input that expands coverage).
238func (w *worker) minimize(ctx context.Context, input fuzzMinimizeInput) (min fuzzResult, err error) {
239	if w.coordinator.opts.MinimizeTimeout != 0 {
240		var cancel func()
241		ctx, cancel = context.WithTimeout(ctx, w.coordinator.opts.MinimizeTimeout)
242		defer cancel()
243	}
244
245	args := minimizeArgs{
246		Limit:        input.limit,
247		Timeout:      input.timeout,
248		KeepCoverage: input.keepCoverage,
249	}
250	entry, resp, err := w.client.minimize(ctx, input.entry, args)
251	if err != nil {
252		// Error communicating with worker.
253		w.stop()
254		if ctx.Err() != nil || w.interrupted || isInterruptError(w.waitErr) {
255			// Worker was interrupted, possibly by the user pressing ^C.
256			// Normally, workers can handle interrupts and timeouts gracefully and
257			// will return without error. An error here indicates the worker
258			// may not have been in a good state, but the error won't be meaningful
259			// to the user. Just return the original crasher without logging anything.
260			return fuzzResult{
261				entry:        input.entry,
262				crasherMsg:   input.crasherMsg,
263				coverageData: input.keepCoverage,
264				canMinimize:  false,
265				limit:        input.limit,
266			}, nil
267		}
268		return fuzzResult{
269			entry:         entry,
270			crasherMsg:    fmt.Sprintf("fuzzing process hung or terminated unexpectedly while minimizing: %v", err),
271			canMinimize:   false,
272			limit:         input.limit,
273			count:         resp.Count,
274			totalDuration: resp.Duration,
275		}, nil
276	}
277
278	if input.crasherMsg != "" && resp.Err == "" {
279		return fuzzResult{}, fmt.Errorf("attempted to minimize a crash but could not reproduce")
280	}
281
282	return fuzzResult{
283		entry:         entry,
284		crasherMsg:    resp.Err,
285		coverageData:  resp.CoverageData,
286		canMinimize:   false,
287		limit:         input.limit,
288		count:         resp.Count,
289		totalDuration: resp.Duration,
290	}, nil
291}
292
293func (w *worker) isRunning() bool {
294	return w.cmd != nil
295}
296
297// startAndPing starts the worker process and sends it a message to make sure it
298// can communicate.
299//
300// startAndPing returns an error if any part of this didn't work, including if
301// the context is expired or the worker process was interrupted before it
302// responded. Errors that happen after start but before the ping response
303// likely indicate that the worker did not call F.Fuzz or called F.Fail first.
304// We don't record crashers for these errors.
305func (w *worker) startAndPing(ctx context.Context) error {
306	if ctx.Err() != nil {
307		return ctx.Err()
308	}
309	if err := w.start(); err != nil {
310		return err
311	}
312	if err := w.client.ping(ctx); err != nil {
313		w.stop()
314		if ctx.Err() != nil {
315			return ctx.Err()
316		}
317		if isInterruptError(err) {
318			// User may have pressed ^C before worker responded.
319			return err
320		}
321		// TODO: record and return stderr.
322		return fmt.Errorf("fuzzing process terminated without fuzzing: %w", err)
323	}
324	return nil
325}
326
327// start runs a new worker process.
328//
329// If the process couldn't be started, start returns an error. Start won't
330// return later termination errors from the process if they occur.
331//
332// If the process starts successfully, start returns nil. stop must be called
333// once later to clean up, even if the process terminates on its own.
334//
335// When the process terminates, w.waitErr is set to the error (if any), and
336// w.termC is closed.
337func (w *worker) start() (err error) {
338	if w.isRunning() {
339		panic("worker already started")
340	}
341	w.waitErr = nil
342	w.interrupted = false
343	w.termC = nil
344
345	cmd := exec.Command(w.binPath, w.args...)
346	cmd.Dir = w.dir
347	cmd.Env = w.env[:len(w.env):len(w.env)] // copy on append to ensure workers don't overwrite each other.
348
349	// Create the "fuzz_in" and "fuzz_out" pipes so we can communicate with
350	// the worker. We don't use stdin and stdout, since the test binary may
351	// do something else with those.
352	//
353	// Each pipe has a reader and a writer. The coordinator writes to fuzzInW
354	// and reads from fuzzOutR. The worker inherits fuzzInR and fuzzOutW.
355	// The coordinator closes fuzzInR and fuzzOutW after starting the worker,
356	// since we have no further need of them.
357	fuzzInR, fuzzInW, err := os.Pipe()
358	if err != nil {
359		return err
360	}
361	defer fuzzInR.Close()
362	fuzzOutR, fuzzOutW, err := os.Pipe()
363	if err != nil {
364		fuzzInW.Close()
365		return err
366	}
367	defer fuzzOutW.Close()
368	setWorkerComm(cmd, workerComm{fuzzIn: fuzzInR, fuzzOut: fuzzOutW, memMu: w.memMu})
369
370	// Start the worker process.
371	if err := cmd.Start(); err != nil {
372		fuzzInW.Close()
373		fuzzOutR.Close()
374		return err
375	}
376
377	// Worker started successfully.
378	// After this, w.client owns fuzzInW and fuzzOutR, so w.client.Close must be
379	// called later by stop.
380	w.cmd = cmd
381	w.termC = make(chan struct{})
382	comm := workerComm{fuzzIn: fuzzInW, fuzzOut: fuzzOutR, memMu: w.memMu}
383	m := newMutator()
384	w.client = newWorkerClient(comm, m)
385
386	go func() {
387		w.waitErr = w.cmd.Wait()
388		close(w.termC)
389	}()
390
391	return nil
392}
393
394// stop tells the worker process to exit by closing w.client, then blocks until
395// it terminates. If the worker doesn't terminate after a short time, stop
396// signals it with os.Interrupt (where supported), then os.Kill.
397//
398// stop returns the error the process terminated with, if any (same as
399// w.waitErr).
400//
401// stop must be called at least once after start returns successfully, even if
402// the worker process terminates unexpectedly.
403func (w *worker) stop() error {
404	if w.termC == nil {
405		panic("worker was not started successfully")
406	}
407	select {
408	case <-w.termC:
409		// Worker already terminated.
410		if w.client == nil {
411			// stop already called.
412			return w.waitErr
413		}
414		// Possible unexpected termination.
415		w.client.Close()
416		w.cmd = nil
417		w.client = nil
418		return w.waitErr
419	default:
420		// Worker still running.
421	}
422
423	// Tell the worker to stop by closing fuzz_in. It won't actually stop until it
424	// finishes with earlier calls.
425	closeC := make(chan struct{})
426	go func() {
427		w.client.Close()
428		close(closeC)
429	}()
430
431	sig := os.Interrupt
432	if runtime.GOOS == "windows" {
433		// Per https://golang.org/pkg/os/#Signal, “Interrupt is not implemented on
434		// Windows; using it with os.Process.Signal will return an error.”
435		// Fall back to Kill instead.
436		sig = os.Kill
437	}
438
439	t := time.NewTimer(workerTimeoutDuration)
440	for {
441		select {
442		case <-w.termC:
443			// Worker terminated.
444			t.Stop()
445			<-closeC
446			w.cmd = nil
447			w.client = nil
448			return w.waitErr
449
450		case <-t.C:
451			// Timer fired before worker terminated.
452			w.interrupted = true
453			switch sig {
454			case os.Interrupt:
455				// Try to stop the worker with SIGINT and wait a little longer.
456				w.cmd.Process.Signal(sig)
457				sig = os.Kill
458				t.Reset(workerTimeoutDuration)
459
460			case os.Kill:
461				// Try to stop the worker with SIGKILL and keep waiting.
462				w.cmd.Process.Signal(sig)
463				sig = nil
464				t.Reset(workerTimeoutDuration)
465
466			case nil:
467				// Still waiting. Print a message to let the user know why.
468				fmt.Fprintf(w.coordinator.opts.Log, "waiting for fuzzing process to terminate...\n")
469			}
470		}
471	}
472}
473
474// RunFuzzWorker is called in a worker process to communicate with the
475// coordinator process in order to fuzz random inputs. RunFuzzWorker loops
476// until the coordinator tells it to stop.
477//
478// fn is a wrapper on the fuzz function. It may return an error to indicate
479// a given input "crashed". The coordinator will also record a crasher if
480// the function times out or terminates the process.
481//
482// RunFuzzWorker returns an error if it could not communicate with the
483// coordinator process.
484func RunFuzzWorker(ctx context.Context, fn func(CorpusEntry) error) error {
485	comm, err := getWorkerComm()
486	if err != nil {
487		return err
488	}
489	srv := &workerServer{
490		workerComm: comm,
491		fuzzFn: func(e CorpusEntry) (time.Duration, error) {
492			timer := time.AfterFunc(10*time.Second, func() {
493				panic("deadlocked!") // this error message won't be printed
494			})
495			defer timer.Stop()
496			start := time.Now()
497			err := fn(e)
498			return time.Since(start), err
499		},
500		m: newMutator(),
501	}
502	return srv.serve(ctx)
503}
504
505// call is serialized and sent from the coordinator on fuzz_in. It acts as
506// a minimalist RPC mechanism. Exactly one of its fields must be set to indicate
507// which method to call.
508type call struct {
509	Ping     *pingArgs
510	Fuzz     *fuzzArgs
511	Minimize *minimizeArgs
512}
513
514// minimizeArgs contains arguments to workerServer.minimize. The value to
515// minimize is already in shared memory.
516type minimizeArgs struct {
517	// Timeout is the time to spend minimizing. This may include time to start up,
518	// especially if the input causes the worker process to terminated, requiring
519	// repeated restarts.
520	Timeout time.Duration
521
522	// Limit is the maximum number of values to test, without spending more time
523	// than Duration. 0 indicates no limit.
524	Limit int64
525
526	// KeepCoverage is a set of coverage counters the worker should attempt to
527	// keep in minimized values. When provided, the worker will reject inputs that
528	// don't cause at least one of these bits to be set.
529	KeepCoverage []byte
530
531	// Index is the index of the fuzz target parameter to be minimized.
532	Index int
533}
534
535// minimizeResponse contains results from workerServer.minimize.
536type minimizeResponse struct {
537	// WroteToMem is true if the worker found a smaller input and wrote it to
538	// shared memory. If minimizeArgs.KeepCoverage was set, the minimized input
539	// preserved at least one coverage bit and did not cause an error.
540	// Otherwise, the minimized input caused some error, recorded in Err.
541	WroteToMem bool
542
543	// Err is the error string caused by the value in shared memory, if any.
544	Err string
545
546	// CoverageData is the set of coverage bits activated by the minimized value
547	// in shared memory. When set, it contains at least one bit from KeepCoverage.
548	// CoverageData will be nil if Err is set or if minimization failed.
549	CoverageData []byte
550
551	// Duration is the time spent minimizing, not including starting or cleaning up.
552	Duration time.Duration
553
554	// Count is the number of values tested.
555	Count int64
556}
557
558// fuzzArgs contains arguments to workerServer.fuzz. The value to fuzz is
559// passed in shared memory.
560type fuzzArgs struct {
561	// Timeout is the time to spend fuzzing, not including starting or
562	// cleaning up.
563	Timeout time.Duration
564
565	// Limit is the maximum number of values to test, without spending more time
566	// than Duration. 0 indicates no limit.
567	Limit int64
568
569	// Warmup indicates whether this is part of a warmup run, meaning that
570	// fuzzing should not occur. If coverageEnabled is true, then coverage data
571	// should be reported.
572	Warmup bool
573
574	// CoverageData is the coverage data. If set, the worker should update its
575	// local coverage data prior to fuzzing.
576	CoverageData []byte
577}
578
579// fuzzResponse contains results from workerServer.fuzz.
580type fuzzResponse struct {
581	// Duration is the time spent fuzzing, not including starting or cleaning up.
582	TotalDuration       time.Duration
583	InterestingDuration time.Duration
584
585	// Count is the number of values tested.
586	Count int64
587
588	// CoverageData is set if the value in shared memory expands coverage
589	// and therefore may be interesting to the coordinator.
590	CoverageData []byte
591
592	// Err is the error string caused by the value in shared memory, which is
593	// non-empty if the value in shared memory caused a crash.
594	Err string
595
596	// InternalErr is the error string caused by an internal error in the
597	// worker. This shouldn't be considered a crasher.
598	InternalErr string
599}
600
601// pingArgs contains arguments to workerServer.ping.
602type pingArgs struct{}
603
604// pingResponse contains results from workerServer.ping.
605type pingResponse struct{}
606
607// workerComm holds pipes and shared memory used for communication
608// between the coordinator process (client) and a worker process (server).
609// These values are unique to each worker; they are shared only with the
610// coordinator, not with other workers.
611//
612// Access to shared memory is synchronized implicitly over the RPC protocol
613// implemented in workerServer and workerClient. During a call, the client
614// (worker) has exclusive access to shared memory; at other times, the server
615// (coordinator) has exclusive access.
616type workerComm struct {
617	fuzzIn, fuzzOut *os.File
618	memMu           chan *sharedMem // mutex guarding shared memory
619}
620
621// workerServer is a minimalist RPC server, run by fuzz worker processes.
622// It allows the coordinator process (using workerClient) to call methods in a
623// worker process. This system allows the coordinator to run multiple worker
624// processes in parallel and to collect inputs that caused crashes from shared
625// memory after a worker process terminates unexpectedly.
626type workerServer struct {
627	workerComm
628	m *mutator
629
630	// coverageMask is the local coverage data for the worker. It is
631	// periodically updated to reflect the data in the coordinator when new
632	// coverage is found.
633	coverageMask []byte
634
635	// fuzzFn runs the worker's fuzz target on the given input and returns an
636	// error if it finds a crasher (the process may also exit or crash), and the
637	// time it took to run the input. It sets a deadline of 10 seconds, at which
638	// point it will panic with the assumption that the process is hanging or
639	// deadlocked.
640	fuzzFn func(CorpusEntry) (time.Duration, error)
641}
642
643// serve reads serialized RPC messages on fuzzIn. When serve receives a message,
644// it calls the corresponding method, then sends the serialized result back
645// on fuzzOut.
646//
647// serve handles RPC calls synchronously; it will not attempt to read a message
648// until the previous call has finished.
649//
650// serve returns errors that occurred when communicating over pipes. serve
651// does not return errors from method calls; those are passed through serialized
652// responses.
653func (ws *workerServer) serve(ctx context.Context) error {
654	enc := json.NewEncoder(ws.fuzzOut)
655	dec := json.NewDecoder(&contextReader{ctx: ctx, r: ws.fuzzIn})
656	for {
657		var c call
658		if err := dec.Decode(&c); err != nil {
659			if err == io.EOF || err == ctx.Err() {
660				return nil
661			} else {
662				return err
663			}
664		}
665
666		var resp any
667		switch {
668		case c.Fuzz != nil:
669			resp = ws.fuzz(ctx, *c.Fuzz)
670		case c.Minimize != nil:
671			resp = ws.minimize(ctx, *c.Minimize)
672		case c.Ping != nil:
673			resp = ws.ping(ctx, *c.Ping)
674		default:
675			return errors.New("no arguments provided for any call")
676		}
677
678		if err := enc.Encode(resp); err != nil {
679			return err
680		}
681	}
682}
683
684// chainedMutations is how many mutations are applied before the worker
685// resets the input to it's original state.
686// NOTE: this number was picked without much thought. It is low enough that
687// it seems to create a significant diversity in mutated inputs. We may want
688// to consider looking into this more closely once we have a proper performance
689// testing framework. Another option is to randomly pick the number of chained
690// mutations on each invocation of the workerServer.fuzz method (this appears to
691// be what libFuzzer does, although there seems to be no documentation which
692// explains why this choice was made.)
693const chainedMutations = 5
694
695// fuzz runs the test function on random variations of the input value in shared
696// memory for a limited duration or number of iterations.
697//
698// fuzz returns early if it finds an input that crashes the fuzz function (with
699// fuzzResponse.Err set) or an input that expands coverage (with
700// fuzzResponse.InterestingDuration set).
701//
702// fuzz does not modify the input in shared memory. Instead, it saves the
703// initial PRNG state in shared memory and increments a counter in shared
704// memory before each call to the test function. The caller may reconstruct
705// the crashing input with this information, since the PRNG is deterministic.
706func (ws *workerServer) fuzz(ctx context.Context, args fuzzArgs) (resp fuzzResponse) {
707	if args.CoverageData != nil {
708		if ws.coverageMask != nil && len(args.CoverageData) != len(ws.coverageMask) {
709			resp.InternalErr = fmt.Sprintf("unexpected size for CoverageData: got %d, expected %d", len(args.CoverageData), len(ws.coverageMask))
710			return resp
711		}
712		ws.coverageMask = args.CoverageData
713	}
714	start := time.Now()
715	defer func() { resp.TotalDuration = time.Since(start) }()
716
717	if args.Timeout != 0 {
718		var cancel func()
719		ctx, cancel = context.WithTimeout(ctx, args.Timeout)
720		defer cancel()
721	}
722	mem := <-ws.memMu
723	ws.m.r.save(&mem.header().randState, &mem.header().randInc)
724	defer func() {
725		resp.Count = mem.header().count
726		ws.memMu <- mem
727	}()
728	if args.Limit > 0 && mem.header().count >= args.Limit {
729		resp.InternalErr = fmt.Sprintf("mem.header().count %d already exceeds args.Limit %d", mem.header().count, args.Limit)
730		return resp
731	}
732
733	originalVals, err := unmarshalCorpusFile(mem.valueCopy())
734	if err != nil {
735		resp.InternalErr = err.Error()
736		return resp
737	}
738	vals := make([]any, len(originalVals))
739	copy(vals, originalVals)
740
741	shouldStop := func() bool {
742		return args.Limit > 0 && mem.header().count >= args.Limit
743	}
744	fuzzOnce := func(entry CorpusEntry) (dur time.Duration, cov []byte, errMsg string) {
745		mem.header().count++
746		var err error
747		dur, err = ws.fuzzFn(entry)
748		if err != nil {
749			errMsg = err.Error()
750			if errMsg == "" {
751				errMsg = "fuzz function failed with no input"
752			}
753			return dur, nil, errMsg
754		}
755		if ws.coverageMask != nil && countNewCoverageBits(ws.coverageMask, coverageSnapshot) > 0 {
756			return dur, coverageSnapshot, ""
757		}
758		return dur, nil, ""
759	}
760
761	if args.Warmup {
762		dur, _, errMsg := fuzzOnce(CorpusEntry{Values: vals})
763		if errMsg != "" {
764			resp.Err = errMsg
765			return resp
766		}
767		resp.InterestingDuration = dur
768		if coverageEnabled {
769			resp.CoverageData = coverageSnapshot
770		}
771		return resp
772	}
773
774	for {
775		select {
776		case <-ctx.Done():
777			return resp
778		default:
779			if mem.header().count%chainedMutations == 0 {
780				copy(vals, originalVals)
781				ws.m.r.save(&mem.header().randState, &mem.header().randInc)
782			}
783			ws.m.mutate(vals, cap(mem.valueRef()))
784
785			entry := CorpusEntry{Values: vals}
786			dur, cov, errMsg := fuzzOnce(entry)
787			if errMsg != "" {
788				resp.Err = errMsg
789				return resp
790			}
791			if cov != nil {
792				resp.CoverageData = cov
793				resp.InterestingDuration = dur
794				return resp
795			}
796			if shouldStop() {
797				return resp
798			}
799		}
800	}
801}
802
803func (ws *workerServer) minimize(ctx context.Context, args minimizeArgs) (resp minimizeResponse) {
804	start := time.Now()
805	defer func() { resp.Duration = time.Since(start) }()
806	mem := <-ws.memMu
807	defer func() { ws.memMu <- mem }()
808	vals, err := unmarshalCorpusFile(mem.valueCopy())
809	if err != nil {
810		panic(err)
811	}
812	inpHash := sha256.Sum256(mem.valueCopy())
813	if args.Timeout != 0 {
814		var cancel func()
815		ctx, cancel = context.WithTimeout(ctx, args.Timeout)
816		defer cancel()
817	}
818
819	// Minimize the values in vals, then write to shared memory. We only write
820	// to shared memory after completing minimization.
821	success, err := ws.minimizeInput(ctx, vals, mem, args)
822	if success {
823		writeToMem(vals, mem)
824		outHash := sha256.Sum256(mem.valueCopy())
825		mem.header().rawInMem = false
826		resp.WroteToMem = true
827		if err != nil {
828			resp.Err = err.Error()
829		} else {
830			// If the values didn't change during minimization then coverageSnapshot is likely
831			// a dirty snapshot which represents the very last step of minimization, not the
832			// coverage for the initial input. In that case just return the coverage we were
833			// given initially, since it more accurately represents the coverage map for the
834			// input we are returning.
835			if outHash != inpHash {
836				resp.CoverageData = coverageSnapshot
837			} else {
838				resp.CoverageData = args.KeepCoverage
839			}
840		}
841	}
842	return resp
843}
844
845// minimizeInput applies a series of minimizing transformations on the provided
846// vals, ensuring that each minimization still causes an error, or keeps
847// coverage, in fuzzFn. It uses the context to determine how long to run,
848// stopping once closed. It returns a bool indicating whether minimization was
849// successful and an error if one was found.
850func (ws *workerServer) minimizeInput(ctx context.Context, vals []any, mem *sharedMem, args minimizeArgs) (success bool, retErr error) {
851	keepCoverage := args.KeepCoverage
852	memBytes := mem.valueRef()
853	bPtr := &memBytes
854	count := &mem.header().count
855	shouldStop := func() bool {
856		return ctx.Err() != nil ||
857			(args.Limit > 0 && *count >= args.Limit)
858	}
859	if shouldStop() {
860		return false, nil
861	}
862
863	// Check that the original value preserves coverage or causes an error.
864	// If not, then whatever caused us to think the value was interesting may
865	// have been a flake, and we can't minimize it.
866	*count++
867	_, retErr = ws.fuzzFn(CorpusEntry{Values: vals})
868	if keepCoverage != nil {
869		if !hasCoverageBit(keepCoverage, coverageSnapshot) || retErr != nil {
870			return false, nil
871		}
872	} else if retErr == nil {
873		return false, nil
874	}
875	mem.header().rawInMem = true
876
877	// tryMinimized runs the fuzz function with candidate replacing the value
878	// at index valI. tryMinimized returns whether the input with candidate is
879	// interesting for the same reason as the original input: it returns
880	// an error if one was expected, or it preserves coverage.
881	tryMinimized := func(candidate []byte) bool {
882		prev := vals[args.Index]
883		switch prev.(type) {
884		case []byte:
885			vals[args.Index] = candidate
886		case string:
887			vals[args.Index] = string(candidate)
888		default:
889			panic("impossible")
890		}
891		copy(*bPtr, candidate)
892		*bPtr = (*bPtr)[:len(candidate)]
893		mem.setValueLen(len(candidate))
894		*count++
895		_, err := ws.fuzzFn(CorpusEntry{Values: vals})
896		if err != nil {
897			retErr = err
898			if keepCoverage != nil {
899				// Now that we've found a crash, that's more important than any
900				// minimization of interesting inputs that was being done. Clear out
901				// keepCoverage to only minimize the crash going forward.
902				keepCoverage = nil
903			}
904			return true
905		}
906		// Minimization should preserve coverage bits.
907		if keepCoverage != nil && isCoverageSubset(keepCoverage, coverageSnapshot) {
908			return true
909		}
910		vals[args.Index] = prev
911		return false
912	}
913	switch v := vals[args.Index].(type) {
914	case string:
915		minimizeBytes([]byte(v), tryMinimized, shouldStop)
916	case []byte:
917		minimizeBytes(v, tryMinimized, shouldStop)
918	default:
919		panic("impossible")
920	}
921	return true, retErr
922}
923
924func writeToMem(vals []any, mem *sharedMem) {
925	b := marshalCorpusFile(vals...)
926	mem.setValue(b)
927}
928
929// ping does nothing. The coordinator calls this method to ensure the worker
930// has called F.Fuzz and can communicate.
931func (ws *workerServer) ping(ctx context.Context, args pingArgs) pingResponse {
932	return pingResponse{}
933}
934
935// workerClient is a minimalist RPC client. The coordinator process uses a
936// workerClient to call methods in each worker process (handled by
937// workerServer).
938type workerClient struct {
939	workerComm
940	m *mutator
941
942	// mu is the mutex protecting the workerComm.fuzzIn pipe. This must be
943	// locked before making calls to the workerServer. It prevents
944	// workerClient.Close from closing fuzzIn while workerClient methods are
945	// writing to it concurrently, and prevents multiple callers from writing to
946	// fuzzIn concurrently.
947	mu sync.Mutex
948}
949
950func newWorkerClient(comm workerComm, m *mutator) *workerClient {
951	return &workerClient{workerComm: comm, m: m}
952}
953
954// Close shuts down the connection to the RPC server (the worker process) by
955// closing fuzz_in. Close drains fuzz_out (avoiding a SIGPIPE in the worker),
956// and closes it after the worker process closes the other end.
957func (wc *workerClient) Close() error {
958	wc.mu.Lock()
959	defer wc.mu.Unlock()
960
961	// Close fuzzIn. This signals to the server that there are no more calls,
962	// and it should exit.
963	if err := wc.fuzzIn.Close(); err != nil {
964		wc.fuzzOut.Close()
965		return err
966	}
967
968	// Drain fuzzOut and close it. When the server exits, the kernel will close
969	// its end of fuzzOut, and we'll get EOF.
970	if _, err := io.Copy(io.Discard, wc.fuzzOut); err != nil {
971		wc.fuzzOut.Close()
972		return err
973	}
974	return wc.fuzzOut.Close()
975}
976
977// errSharedMemClosed is returned by workerClient methods that cannot access
978// shared memory because it was closed and unmapped by another goroutine. That
979// can happen when worker.cleanup is called in the worker goroutine while a
980// workerClient.fuzz call runs concurrently.
981//
982// This error should not be reported. It indicates the operation was
983// interrupted.
984var errSharedMemClosed = errors.New("internal error: shared memory was closed and unmapped")
985
986// minimize tells the worker to call the minimize method. See
987// workerServer.minimize.
988func (wc *workerClient) minimize(ctx context.Context, entryIn CorpusEntry, args minimizeArgs) (entryOut CorpusEntry, resp minimizeResponse, retErr error) {
989	wc.mu.Lock()
990	defer wc.mu.Unlock()
991
992	mem, ok := <-wc.memMu
993	if !ok {
994		return CorpusEntry{}, minimizeResponse{}, errSharedMemClosed
995	}
996	defer func() { wc.memMu <- mem }()
997	mem.header().count = 0
998	inp, err := corpusEntryData(entryIn)
999	if err != nil {
1000		return CorpusEntry{}, minimizeResponse{}, err
1001	}
1002	mem.setValue(inp)
1003	entryOut = entryIn
1004	entryOut.Values, err = unmarshalCorpusFile(inp)
1005	if err != nil {
1006		return CorpusEntry{}, minimizeResponse{}, fmt.Errorf("workerClient.minimize unmarshaling provided value: %v", err)
1007	}
1008	for i, v := range entryOut.Values {
1009		if !isMinimizable(reflect.TypeOf(v)) {
1010			continue
1011		}
1012
1013		wc.memMu <- mem
1014		args.Index = i
1015		c := call{Minimize: &args}
1016		callErr := wc.callLocked(ctx, c, &resp)
1017		mem, ok = <-wc.memMu
1018		if !ok {
1019			return CorpusEntry{}, minimizeResponse{}, errSharedMemClosed
1020		}
1021
1022		if callErr != nil {
1023			retErr = callErr
1024			if !mem.header().rawInMem {
1025				// An unrecoverable error occurred before minimization began.
1026				return entryIn, minimizeResponse{}, retErr
1027			}
1028			// An unrecoverable error occurred during minimization. mem now
1029			// holds the raw, unmarshaled bytes of entryIn.Values[i] that
1030			// caused the error.
1031			switch entryOut.Values[i].(type) {
1032			case string:
1033				entryOut.Values[i] = string(mem.valueCopy())
1034			case []byte:
1035				entryOut.Values[i] = mem.valueCopy()
1036			default:
1037				panic("impossible")
1038			}
1039			entryOut.Data = marshalCorpusFile(entryOut.Values...)
1040			// Stop minimizing; another unrecoverable error is likely to occur.
1041			break
1042		}
1043
1044		if resp.WroteToMem {
1045			// Minimization succeeded, and mem holds the marshaled data.
1046			entryOut.Data = mem.valueCopy()
1047			entryOut.Values, err = unmarshalCorpusFile(entryOut.Data)
1048			if err != nil {
1049				return CorpusEntry{}, minimizeResponse{}, fmt.Errorf("workerClient.minimize unmarshaling minimized value: %v", err)
1050			}
1051		}
1052
1053		// Prepare for next iteration of the loop.
1054		if args.Timeout != 0 {
1055			args.Timeout -= resp.Duration
1056			if args.Timeout <= 0 {
1057				break
1058			}
1059		}
1060		if args.Limit != 0 {
1061			args.Limit -= mem.header().count
1062			if args.Limit <= 0 {
1063				break
1064			}
1065		}
1066	}
1067	resp.Count = mem.header().count
1068	h := sha256.Sum256(entryOut.Data)
1069	entryOut.Path = fmt.Sprintf("%x", h[:4])
1070	return entryOut, resp, retErr
1071}
1072
1073// fuzz tells the worker to call the fuzz method. See workerServer.fuzz.
1074func (wc *workerClient) fuzz(ctx context.Context, entryIn CorpusEntry, args fuzzArgs) (entryOut CorpusEntry, resp fuzzResponse, isInternalError bool, err error) {
1075	wc.mu.Lock()
1076	defer wc.mu.Unlock()
1077
1078	mem, ok := <-wc.memMu
1079	if !ok {
1080		return CorpusEntry{}, fuzzResponse{}, true, errSharedMemClosed
1081	}
1082	mem.header().count = 0
1083	inp, err := corpusEntryData(entryIn)
1084	if err != nil {
1085		wc.memMu <- mem
1086		return CorpusEntry{}, fuzzResponse{}, true, err
1087	}
1088	mem.setValue(inp)
1089	wc.memMu <- mem
1090
1091	c := call{Fuzz: &args}
1092	callErr := wc.callLocked(ctx, c, &resp)
1093	if resp.InternalErr != "" {
1094		return CorpusEntry{}, fuzzResponse{}, true, errors.New(resp.InternalErr)
1095	}
1096	mem, ok = <-wc.memMu
1097	if !ok {
1098		return CorpusEntry{}, fuzzResponse{}, true, errSharedMemClosed
1099	}
1100	defer func() { wc.memMu <- mem }()
1101	resp.Count = mem.header().count
1102
1103	if !bytes.Equal(inp, mem.valueRef()) {
1104		return CorpusEntry{}, fuzzResponse{}, true, errors.New("workerServer.fuzz modified input")
1105	}
1106	needEntryOut := callErr != nil || resp.Err != "" ||
1107		(!args.Warmup && resp.CoverageData != nil)
1108	if needEntryOut {
1109		valuesOut, err := unmarshalCorpusFile(inp)
1110		if err != nil {
1111			return CorpusEntry{}, fuzzResponse{}, true, fmt.Errorf("unmarshaling fuzz input value after call: %v", err)
1112		}
1113		wc.m.r.restore(mem.header().randState, mem.header().randInc)
1114		if !args.Warmup {
1115			// Only mutate the valuesOut if fuzzing actually occurred.
1116			numMutations := ((resp.Count - 1) % chainedMutations) + 1
1117			for i := int64(0); i < numMutations; i++ {
1118				wc.m.mutate(valuesOut, cap(mem.valueRef()))
1119			}
1120		}
1121		dataOut := marshalCorpusFile(valuesOut...)
1122
1123		h := sha256.Sum256(dataOut)
1124		name := fmt.Sprintf("%x", h[:4])
1125		entryOut = CorpusEntry{
1126			Parent:     entryIn.Path,
1127			Path:       name,
1128			Data:       dataOut,
1129			Generation: entryIn.Generation + 1,
1130		}
1131		if args.Warmup {
1132			// The bytes weren't mutated, so if entryIn was a seed corpus value,
1133			// then entryOut is too.
1134			entryOut.IsSeed = entryIn.IsSeed
1135		}
1136	}
1137
1138	return entryOut, resp, false, callErr
1139}
1140
1141// ping tells the worker to call the ping method. See workerServer.ping.
1142func (wc *workerClient) ping(ctx context.Context) error {
1143	wc.mu.Lock()
1144	defer wc.mu.Unlock()
1145	c := call{Ping: &pingArgs{}}
1146	var resp pingResponse
1147	return wc.callLocked(ctx, c, &resp)
1148}
1149
1150// callLocked sends an RPC from the coordinator to the worker process and waits
1151// for the response. The callLocked may be canceled with ctx.
1152func (wc *workerClient) callLocked(ctx context.Context, c call, resp any) (err error) {
1153	enc := json.NewEncoder(wc.fuzzIn)
1154	dec := json.NewDecoder(&contextReader{ctx: ctx, r: wc.fuzzOut})
1155	if err := enc.Encode(c); err != nil {
1156		return err
1157	}
1158	return dec.Decode(resp)
1159}
1160
1161// contextReader wraps a Reader with a Context. If the context is canceled
1162// while the underlying reader is blocked, Read returns immediately.
1163//
1164// This is useful for reading from a pipe. Closing a pipe file descriptor does
1165// not unblock pending Reads on that file descriptor. All copies of the pipe's
1166// other file descriptor (the write end) must be closed in all processes that
1167// inherit it. This is difficult to do correctly in the situation we care about
1168// (process group termination).
1169type contextReader struct {
1170	ctx context.Context
1171	r   io.Reader
1172}
1173
1174func (cr *contextReader) Read(b []byte) (int, error) {
1175	if ctxErr := cr.ctx.Err(); ctxErr != nil {
1176		return 0, ctxErr
1177	}
1178	done := make(chan struct{})
1179
1180	// This goroutine may stay blocked after Read returns because the underlying
1181	// read is blocked.
1182	var n int
1183	var err error
1184	go func() {
1185		n, err = cr.r.Read(b)
1186		close(done)
1187	}()
1188
1189	select {
1190	case <-cr.ctx.Done():
1191		return 0, cr.ctx.Err()
1192	case <-done:
1193		return n, err
1194	}
1195}
1196