1// Copyright 2023 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package trace 6 7import ( 8 "bufio" 9 "fmt" 10 "io" 11 "slices" 12 "strings" 13 14 "internal/trace/event/go122" 15 "internal/trace/internal/oldtrace" 16 "internal/trace/version" 17) 18 19// Reader reads a byte stream, validates it, and produces trace events. 20type Reader struct { 21 r *bufio.Reader 22 lastTs Time 23 gen *generation 24 spill *spilledBatch 25 spillErr error // error from reading spill 26 frontier []*batchCursor 27 cpuSamples []cpuSample 28 order ordering 29 emittedSync bool 30 31 go121Events *oldTraceConverter 32} 33 34// NewReader creates a new trace reader. 35func NewReader(r io.Reader) (*Reader, error) { 36 br := bufio.NewReader(r) 37 v, err := version.ReadHeader(br) 38 if err != nil { 39 return nil, err 40 } 41 switch v { 42 case version.Go111, version.Go119, version.Go121: 43 tr, err := oldtrace.Parse(br, v) 44 if err != nil { 45 return nil, err 46 } 47 return &Reader{ 48 go121Events: convertOldFormat(tr), 49 }, nil 50 case version.Go122, version.Go123: 51 return &Reader{ 52 r: br, 53 order: ordering{ 54 mStates: make(map[ThreadID]*mState), 55 pStates: make(map[ProcID]*pState), 56 gStates: make(map[GoID]*gState), 57 activeTasks: make(map[TaskID]taskState), 58 }, 59 // Don't emit a sync event when we first go to emit events. 60 emittedSync: true, 61 }, nil 62 default: 63 return nil, fmt.Errorf("unknown or unsupported version go 1.%d", v) 64 } 65} 66 67// ReadEvent reads a single event from the stream. 68// 69// If the stream has been exhausted, it returns an invalid 70// event and io.EOF. 71func (r *Reader) ReadEvent() (e Event, err error) { 72 if r.go121Events != nil { 73 ev, err := r.go121Events.next() 74 if err != nil { 75 // XXX do we have to emit an EventSync when the trace is done? 76 return Event{}, err 77 } 78 return ev, nil 79 } 80 81 // Go 1.22+ trace parsing algorithm. 82 // 83 // (1) Read in all the batches for the next generation from the stream. 84 // (a) Use the size field in the header to quickly find all batches. 85 // (2) Parse out the strings, stacks, CPU samples, and timestamp conversion data. 86 // (3) Group each event batch by M, sorted by timestamp. (batchCursor contains the groups.) 87 // (4) Organize batchCursors in a min-heap, ordered by the timestamp of the next event for each M. 88 // (5) Try to advance the next event for the M at the top of the min-heap. 89 // (a) On success, select that M. 90 // (b) On failure, sort the min-heap and try to advance other Ms. Select the first M that advances. 91 // (c) If there's nothing left to advance, goto (1). 92 // (6) Select the latest event for the selected M and get it ready to be returned. 93 // (7) Read the next event for the selected M and update the min-heap. 94 // (8) Return the selected event, goto (5) on the next call. 95 96 // Set us up to track the last timestamp and fix up 97 // the timestamp of any event that comes through. 98 defer func() { 99 if err != nil { 100 return 101 } 102 if err = e.validateTableIDs(); err != nil { 103 return 104 } 105 if e.base.time <= r.lastTs { 106 e.base.time = r.lastTs + 1 107 } 108 r.lastTs = e.base.time 109 }() 110 111 // Consume any events in the ordering first. 112 if ev, ok := r.order.Next(); ok { 113 return ev, nil 114 } 115 116 // Check if we need to refresh the generation. 117 if len(r.frontier) == 0 && len(r.cpuSamples) == 0 { 118 if !r.emittedSync { 119 r.emittedSync = true 120 return syncEvent(r.gen.evTable, r.lastTs), nil 121 } 122 if r.spillErr != nil { 123 return Event{}, r.spillErr 124 } 125 if r.gen != nil && r.spill == nil { 126 // If we have a generation from the last read, 127 // and there's nothing left in the frontier, and 128 // there's no spilled batch, indicating that there's 129 // no further generation, it means we're done. 130 // Return io.EOF. 131 return Event{}, io.EOF 132 } 133 // Read the next generation. 134 var err error 135 r.gen, r.spill, err = readGeneration(r.r, r.spill) 136 if r.gen == nil { 137 return Event{}, err 138 } 139 r.spillErr = err 140 141 // Reset CPU samples cursor. 142 r.cpuSamples = r.gen.cpuSamples 143 144 // Reset frontier. 145 for m, batches := range r.gen.batches { 146 bc := &batchCursor{m: m} 147 ok, err := bc.nextEvent(batches, r.gen.freq) 148 if err != nil { 149 return Event{}, err 150 } 151 if !ok { 152 // Turns out there aren't actually any events in these batches. 153 continue 154 } 155 r.frontier = heapInsert(r.frontier, bc) 156 } 157 158 // Reset emittedSync. 159 r.emittedSync = false 160 } 161 tryAdvance := func(i int) (bool, error) { 162 bc := r.frontier[i] 163 164 if ok, err := r.order.Advance(&bc.ev, r.gen.evTable, bc.m, r.gen.gen); !ok || err != nil { 165 return ok, err 166 } 167 168 // Refresh the cursor's event. 169 ok, err := bc.nextEvent(r.gen.batches[bc.m], r.gen.freq) 170 if err != nil { 171 return false, err 172 } 173 if ok { 174 // If we successfully refreshed, update the heap. 175 heapUpdate(r.frontier, i) 176 } else { 177 // There's nothing else to read. Delete this cursor from the frontier. 178 r.frontier = heapRemove(r.frontier, i) 179 } 180 return true, nil 181 } 182 // Inject a CPU sample if it comes next. 183 if len(r.cpuSamples) != 0 { 184 if len(r.frontier) == 0 || r.cpuSamples[0].time < r.frontier[0].ev.time { 185 e := r.cpuSamples[0].asEvent(r.gen.evTable) 186 r.cpuSamples = r.cpuSamples[1:] 187 return e, nil 188 } 189 } 190 // Try to advance the head of the frontier, which should have the minimum timestamp. 191 // This should be by far the most common case 192 if len(r.frontier) == 0 { 193 return Event{}, fmt.Errorf("broken trace: frontier is empty:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order)) 194 } 195 if ok, err := tryAdvance(0); err != nil { 196 return Event{}, err 197 } else if !ok { 198 // Try to advance the rest of the frontier, in timestamp order. 199 // 200 // To do this, sort the min-heap. A sorted min-heap is still a 201 // min-heap, but now we can iterate over the rest and try to 202 // advance in order. This path should be rare. 203 slices.SortFunc(r.frontier, (*batchCursor).compare) 204 success := false 205 for i := 1; i < len(r.frontier); i++ { 206 if ok, err = tryAdvance(i); err != nil { 207 return Event{}, err 208 } else if ok { 209 success = true 210 break 211 } 212 } 213 if !success { 214 return Event{}, fmt.Errorf("broken trace: failed to advance: frontier:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order)) 215 } 216 } 217 218 // Pick off the next event on the queue. At this point, one must exist. 219 ev, ok := r.order.Next() 220 if !ok { 221 panic("invariant violation: advance successful, but queue is empty") 222 } 223 return ev, nil 224} 225 226func dumpFrontier(frontier []*batchCursor) string { 227 var sb strings.Builder 228 for _, bc := range frontier { 229 spec := go122.Specs()[bc.ev.typ] 230 fmt.Fprintf(&sb, "M %d [%s time=%d", bc.m, spec.Name, bc.ev.time) 231 for i, arg := range spec.Args[1:] { 232 fmt.Fprintf(&sb, " %s=%d", arg, bc.ev.args[i]) 233 } 234 fmt.Fprintf(&sb, "]\n") 235 } 236 return sb.String() 237} 238