1// Copyright 2023 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package trace
6
7import (
8	"bufio"
9	"fmt"
10	"io"
11	"slices"
12	"strings"
13
14	"internal/trace/event/go122"
15	"internal/trace/internal/oldtrace"
16	"internal/trace/version"
17)
18
19// Reader reads a byte stream, validates it, and produces trace events.
20type Reader struct {
21	r           *bufio.Reader
22	lastTs      Time
23	gen         *generation
24	spill       *spilledBatch
25	spillErr    error // error from reading spill
26	frontier    []*batchCursor
27	cpuSamples  []cpuSample
28	order       ordering
29	emittedSync bool
30
31	go121Events *oldTraceConverter
32}
33
34// NewReader creates a new trace reader.
35func NewReader(r io.Reader) (*Reader, error) {
36	br := bufio.NewReader(r)
37	v, err := version.ReadHeader(br)
38	if err != nil {
39		return nil, err
40	}
41	switch v {
42	case version.Go111, version.Go119, version.Go121:
43		tr, err := oldtrace.Parse(br, v)
44		if err != nil {
45			return nil, err
46		}
47		return &Reader{
48			go121Events: convertOldFormat(tr),
49		}, nil
50	case version.Go122, version.Go123:
51		return &Reader{
52			r: br,
53			order: ordering{
54				mStates:     make(map[ThreadID]*mState),
55				pStates:     make(map[ProcID]*pState),
56				gStates:     make(map[GoID]*gState),
57				activeTasks: make(map[TaskID]taskState),
58			},
59			// Don't emit a sync event when we first go to emit events.
60			emittedSync: true,
61		}, nil
62	default:
63		return nil, fmt.Errorf("unknown or unsupported version go 1.%d", v)
64	}
65}
66
67// ReadEvent reads a single event from the stream.
68//
69// If the stream has been exhausted, it returns an invalid
70// event and io.EOF.
71func (r *Reader) ReadEvent() (e Event, err error) {
72	if r.go121Events != nil {
73		ev, err := r.go121Events.next()
74		if err != nil {
75			// XXX do we have to emit an EventSync when the trace is done?
76			return Event{}, err
77		}
78		return ev, nil
79	}
80
81	// Go 1.22+ trace parsing algorithm.
82	//
83	// (1) Read in all the batches for the next generation from the stream.
84	//   (a) Use the size field in the header to quickly find all batches.
85	// (2) Parse out the strings, stacks, CPU samples, and timestamp conversion data.
86	// (3) Group each event batch by M, sorted by timestamp. (batchCursor contains the groups.)
87	// (4) Organize batchCursors in a min-heap, ordered by the timestamp of the next event for each M.
88	// (5) Try to advance the next event for the M at the top of the min-heap.
89	//   (a) On success, select that M.
90	//   (b) On failure, sort the min-heap and try to advance other Ms. Select the first M that advances.
91	//   (c) If there's nothing left to advance, goto (1).
92	// (6) Select the latest event for the selected M and get it ready to be returned.
93	// (7) Read the next event for the selected M and update the min-heap.
94	// (8) Return the selected event, goto (5) on the next call.
95
96	// Set us up to track the last timestamp and fix up
97	// the timestamp of any event that comes through.
98	defer func() {
99		if err != nil {
100			return
101		}
102		if err = e.validateTableIDs(); err != nil {
103			return
104		}
105		if e.base.time <= r.lastTs {
106			e.base.time = r.lastTs + 1
107		}
108		r.lastTs = e.base.time
109	}()
110
111	// Consume any events in the ordering first.
112	if ev, ok := r.order.Next(); ok {
113		return ev, nil
114	}
115
116	// Check if we need to refresh the generation.
117	if len(r.frontier) == 0 && len(r.cpuSamples) == 0 {
118		if !r.emittedSync {
119			r.emittedSync = true
120			return syncEvent(r.gen.evTable, r.lastTs), nil
121		}
122		if r.spillErr != nil {
123			return Event{}, r.spillErr
124		}
125		if r.gen != nil && r.spill == nil {
126			// If we have a generation from the last read,
127			// and there's nothing left in the frontier, and
128			// there's no spilled batch, indicating that there's
129			// no further generation, it means we're done.
130			// Return io.EOF.
131			return Event{}, io.EOF
132		}
133		// Read the next generation.
134		var err error
135		r.gen, r.spill, err = readGeneration(r.r, r.spill)
136		if r.gen == nil {
137			return Event{}, err
138		}
139		r.spillErr = err
140
141		// Reset CPU samples cursor.
142		r.cpuSamples = r.gen.cpuSamples
143
144		// Reset frontier.
145		for m, batches := range r.gen.batches {
146			bc := &batchCursor{m: m}
147			ok, err := bc.nextEvent(batches, r.gen.freq)
148			if err != nil {
149				return Event{}, err
150			}
151			if !ok {
152				// Turns out there aren't actually any events in these batches.
153				continue
154			}
155			r.frontier = heapInsert(r.frontier, bc)
156		}
157
158		// Reset emittedSync.
159		r.emittedSync = false
160	}
161	tryAdvance := func(i int) (bool, error) {
162		bc := r.frontier[i]
163
164		if ok, err := r.order.Advance(&bc.ev, r.gen.evTable, bc.m, r.gen.gen); !ok || err != nil {
165			return ok, err
166		}
167
168		// Refresh the cursor's event.
169		ok, err := bc.nextEvent(r.gen.batches[bc.m], r.gen.freq)
170		if err != nil {
171			return false, err
172		}
173		if ok {
174			// If we successfully refreshed, update the heap.
175			heapUpdate(r.frontier, i)
176		} else {
177			// There's nothing else to read. Delete this cursor from the frontier.
178			r.frontier = heapRemove(r.frontier, i)
179		}
180		return true, nil
181	}
182	// Inject a CPU sample if it comes next.
183	if len(r.cpuSamples) != 0 {
184		if len(r.frontier) == 0 || r.cpuSamples[0].time < r.frontier[0].ev.time {
185			e := r.cpuSamples[0].asEvent(r.gen.evTable)
186			r.cpuSamples = r.cpuSamples[1:]
187			return e, nil
188		}
189	}
190	// Try to advance the head of the frontier, which should have the minimum timestamp.
191	// This should be by far the most common case
192	if len(r.frontier) == 0 {
193		return Event{}, fmt.Errorf("broken trace: frontier is empty:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
194	}
195	if ok, err := tryAdvance(0); err != nil {
196		return Event{}, err
197	} else if !ok {
198		// Try to advance the rest of the frontier, in timestamp order.
199		//
200		// To do this, sort the min-heap. A sorted min-heap is still a
201		// min-heap, but now we can iterate over the rest and try to
202		// advance in order. This path should be rare.
203		slices.SortFunc(r.frontier, (*batchCursor).compare)
204		success := false
205		for i := 1; i < len(r.frontier); i++ {
206			if ok, err = tryAdvance(i); err != nil {
207				return Event{}, err
208			} else if ok {
209				success = true
210				break
211			}
212		}
213		if !success {
214			return Event{}, fmt.Errorf("broken trace: failed to advance: frontier:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
215		}
216	}
217
218	// Pick off the next event on the queue. At this point, one must exist.
219	ev, ok := r.order.Next()
220	if !ok {
221		panic("invariant violation: advance successful, but queue is empty")
222	}
223	return ev, nil
224}
225
226func dumpFrontier(frontier []*batchCursor) string {
227	var sb strings.Builder
228	for _, bc := range frontier {
229		spec := go122.Specs()[bc.ev.typ]
230		fmt.Fprintf(&sb, "M %d [%s time=%d", bc.m, spec.Name, bc.ev.time)
231		for i, arg := range spec.Args[1:] {
232			fmt.Fprintf(&sb, " %s=%d", arg, bc.ev.args[i])
233		}
234		fmt.Fprintf(&sb, "]\n")
235	}
236	return sb.String()
237}
238