1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_thread/thread_snapshot_service.h"
16
17 #include "pw_containers/vector.h"
18 #include "pw_log/log.h"
19 #include "pw_protobuf/decoder.h"
20 #include "pw_rpc/raw/server_reader_writer.h"
21 #include "pw_span/span.h"
22 #include "pw_status/status.h"
23 #include "pw_status/try.h"
24 #include "pw_thread/thread_info.h"
25 #include "pw_thread/thread_iteration.h"
26 #include "pw_thread_private/thread_snapshot_service.h"
27 #include "pw_thread_protos/thread.pwpb.h"
28 #include "pw_thread_protos/thread_snapshot_service.pwpb.h"
29
30 namespace pw::thread::proto {
31
ProtoEncodeThreadInfo(pwpb::SnapshotThreadInfo::StreamEncoder & encoder,const ThreadInfo & thread_info)32 Status ProtoEncodeThreadInfo(pwpb::SnapshotThreadInfo::StreamEncoder& encoder,
33 const ThreadInfo& thread_info) {
34 // Grab the next available Thread slot to write to in the response.
35 pwpb::Thread::StreamEncoder proto_encoder = encoder.GetThreadsEncoder();
36 if (thread_info.thread_name().has_value()) {
37 PW_TRY(proto_encoder.WriteName(thread_info.thread_name().value()));
38 } else {
39 // Name is necessary to identify thread.
40 return Status::FailedPrecondition();
41 }
42 if (thread_info.stack_low_addr().has_value()) {
43 PW_TRY(proto_encoder.WriteStackEndPointer(
44 thread_info.stack_low_addr().value()));
45 }
46 if (thread_info.stack_high_addr().has_value()) {
47 PW_TRY(proto_encoder.WriteStackStartPointer(
48 thread_info.stack_high_addr().value()));
49 } else {
50 // Need stack start pointer to contextualize estimated peak.
51 return Status::FailedPrecondition();
52 }
53 if (thread_info.stack_pointer().has_value()) {
54 PW_TRY(
55 proto_encoder.WriteStackPointer(thread_info.stack_pointer().value()));
56 }
57
58 if (thread_info.stack_peak_addr().has_value()) {
59 PW_TRY(proto_encoder.WriteStackPointerEstPeak(
60 thread_info.stack_peak_addr().value()));
61 } else {
62 // Peak stack usage reporting is not supported.
63 return Status::Unimplemented();
64 }
65
66 return proto_encoder.status();
67 }
68
ErrorLog(Status status)69 void ErrorLog(Status status) {
70 if (status == Status::Unimplemented()) {
71 PW_LOG_ERROR(
72 "Peak stack usage reporting not supported by your current OS or "
73 "configuration.");
74 } else if (status == Status::FailedPrecondition()) {
75 PW_LOG_ERROR("Thread missing information needed by service.");
76 } else if (status == Status::ResourceExhausted()) {
77 PW_LOG_ERROR("Buffer capacity limit exceeded.");
78 } else if (status != OkStatus()) {
79 PW_LOG_ERROR(
80 "Failure with error code %d, RPC service was unable to capture thread "
81 "information",
82 status.code());
83 }
84 }
85
DecodeThreadName(ConstByteSpan serialized_path,ConstByteSpan & thread_name)86 Status DecodeThreadName(ConstByteSpan serialized_path,
87 ConstByteSpan& thread_name) {
88 protobuf::Decoder decoder(serialized_path);
89 Status status;
90 while (decoder.Next().ok()) {
91 switch (decoder.FieldNumber()) {
92 case static_cast<uint32_t>(pwpb::Thread::Fields::kName): {
93 status.Update(decoder.ReadBytes(&thread_name));
94 }
95 }
96 }
97 return status;
98 }
99
GetPeakStackUsage(ConstByteSpan request,rpc::RawServerWriter & response_writer)100 void ThreadSnapshotService::GetPeakStackUsage(
101 ConstByteSpan request, rpc::RawServerWriter& response_writer) {
102 // For now, ignore the request and just stream all the thread information
103 // back.
104 struct IterationInfo {
105 pwpb::SnapshotThreadInfo::MemoryEncoder encoder;
106 Status status;
107 ConstByteSpan name;
108
109 // For sending out data by chunks.
110 Vector<size_t>& thread_proto_indices;
111 };
112
113 ConstByteSpan name_request;
114 if (!request.empty()) {
115 if (const auto status = DecodeThreadName(request, name_request);
116 !status.ok()) {
117 PW_LOG_ERROR("Service unable to decode thread name with error code %d",
118 status.code());
119 }
120 }
121
122 IterationInfo iteration_info{
123 pwpb::SnapshotThreadInfo::MemoryEncoder(encode_buffer_),
124 OkStatus(),
125 name_request,
126 thread_proto_indices_};
127
128 iteration_info.thread_proto_indices.clear();
129 iteration_info.thread_proto_indices.push_back(iteration_info.encoder.size());
130
131 auto cb = [&iteration_info](const ThreadInfo& thread_info) {
132 if (!iteration_info.name.empty() && thread_info.thread_name().has_value()) {
133 if (std::equal(thread_info.thread_name().value().begin(),
134 thread_info.thread_name().value().end(),
135 iteration_info.name.begin())) {
136 iteration_info.status.Update(
137 ProtoEncodeThreadInfo(iteration_info.encoder, thread_info));
138 iteration_info.thread_proto_indices.push_back(
139 iteration_info.encoder.size());
140 return false;
141 }
142 } else {
143 iteration_info.status.Update(
144 ProtoEncodeThreadInfo(iteration_info.encoder, thread_info));
145 iteration_info.thread_proto_indices.push_back(
146 iteration_info.encoder.size());
147 }
148 return iteration_info.status.ok();
149 };
150 if (const auto status = ForEachThread(cb); !status.ok()) {
151 PW_LOG_ERROR("Failed to capture thread information, error %d",
152 status.code());
153 }
154
155 // This logging action is external to thread iteration because it is
156 // unsafe to log within ForEachThread() when the scheduler is disabled.
157 ErrorLog(iteration_info.status);
158
159 Status status;
160 if (iteration_info.encoder.size() && iteration_info.status.ok()) {
161 // Must subtract 1 because the last boundary index of thread_proto_indices
162 // is the end of the last submessage, and NOT the start of another.
163 size_t last_start_index = iteration_info.thread_proto_indices.size() - 1;
164 for (size_t i = 0; i < last_start_index; i += num_bundled_threads_) {
165 const size_t num_threads =
166 std::min(num_bundled_threads_, last_start_index - i);
167
168 // Sending out a bundle of threads at a time.
169 const size_t bundle_size =
170 iteration_info.thread_proto_indices[i + num_threads] -
171 iteration_info.thread_proto_indices[i];
172
173 ConstByteSpan thread =
174 ConstByteSpan(iteration_info.encoder.data() +
175 iteration_info.thread_proto_indices[i],
176 bundle_size);
177
178 if (bundle_size) {
179 status.Update(response_writer.Write(thread));
180 }
181 if (!status.ok()) {
182 PW_LOG_ERROR(
183 "Failed to send response with error code %d, packet may be too "
184 "large to send",
185 status.code());
186 }
187 }
188 }
189
190 if (response_writer.Finish(status) != OkStatus()) {
191 PW_LOG_ERROR(
192 "Failed to close stream for GetPeakStackUsage() with error code %d",
193 status.code());
194 }
195 }
196
197 } // namespace pw::thread::proto
198