xref: /aosp_15_r20/external/tensorflow/tensorflow/core/profiler/rpc/profiler_service_impl.cc (revision b6fb3261f9314811a0f4371741dbb8839866f948)
1 /* Copyright 2018 The TensorFlow Authors. All Rights Reserved.
2 
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6 
7     http://www.apache.org/licenses/LICENSE-2.0
8 
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 
16 #include "tensorflow/core/profiler/rpc/profiler_service_impl.h"
17 
18 #include <memory>
19 
20 #include "grpcpp/support/status.h"
21 #include "absl/container/flat_hash_map.h"
22 #include "absl/strings/str_replace.h"
23 #include "tensorflow/core/platform/env.h"
24 #include "tensorflow/core/platform/env_time.h"
25 #include "tensorflow/core/platform/errors.h"
26 #include "tensorflow/core/platform/logging.h"
27 #include "tensorflow/core/platform/macros.h"
28 #include "tensorflow/core/platform/mutex.h"
29 #include "tensorflow/core/platform/status.h"
30 #include "tensorflow/core/profiler/lib/profiler_session.h"
31 #include "tensorflow/core/profiler/profiler_service.grpc.pb.h"
32 #include "tensorflow/core/profiler/profiler_service.pb.h"
33 #include "tensorflow/core/profiler/protobuf/xplane.pb.h"
34 #include "tensorflow/core/profiler/utils/file_system_utils.h"
35 #include "tensorflow/core/profiler/utils/time_utils.h"
36 #include "tensorflow/core/profiler/utils/xplane_utils.h"
37 
38 namespace tensorflow {
39 namespace profiler {
40 namespace {
41 
42 const absl::string_view kXPlanePb = "xplane.pb";
43 
44 // Collects data in XSpace format. The data is saved to a repository
45 // unconditionally.
CollectDataToRepository(const ProfileRequest & request,ProfilerSession * profiler,ProfileResponse * response)46 Status CollectDataToRepository(const ProfileRequest& request,
47                                ProfilerSession* profiler,
48                                ProfileResponse* response) {
49   response->set_empty_trace(true);
50   // Read the profile data into xspace.
51   XSpace xspace;
52   TF_RETURN_IF_ERROR(profiler->CollectData(&xspace));
53   VLOG(3) << "Collected XSpace to repository.";
54   response->set_empty_trace(IsEmpty(xspace));
55 
56   std::string log_dir_path =
57       ProfilerJoinPath(request.repository_root(), request.session_id());
58   VLOG(1) << "Creating " << log_dir_path;
59   TF_RETURN_IF_ERROR(Env::Default()->RecursivelyCreateDir(log_dir_path));
60 
61   std::string file_name = absl::StrCat(request.host_name(), ".", kXPlanePb);
62   // Windows file names do not support colons.
63   absl::StrReplaceAll({{":", "_"}}, &file_name);
64   // Dumps profile data to <repository_root>/<run>/<host>_<port>.<kXPlanePb>
65   std::string out_path = ProfilerJoinPath(log_dir_path, file_name);
66   LOG(INFO) << "Collecting XSpace to repository: " << out_path;
67 
68   return WriteBinaryProto(Env::Default(), out_path, xspace);
69 }
70 
71 class ProfilerServiceImpl : public grpc::ProfilerService::Service {
72  public:
Monitor(::grpc::ServerContext * ctx,const MonitorRequest * req,MonitorResponse * response)73   ::grpc::Status Monitor(::grpc::ServerContext* ctx, const MonitorRequest* req,
74                          MonitorResponse* response) override {
75     return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "unimplemented.");
76   }
77 
Profile(::grpc::ServerContext * ctx,const ProfileRequest * req,ProfileResponse * response)78   ::grpc::Status Profile(::grpc::ServerContext* ctx, const ProfileRequest* req,
79                          ProfileResponse* response) override {
80     VLOG(1) << "Received a profile request: " << req->DebugString();
81     std::unique_ptr<ProfilerSession> profiler =
82         ProfilerSession::Create(req->opts());
83     Status status = profiler->Status();
84     if (!status.ok()) {
85       return ::grpc::Status(::grpc::StatusCode::INTERNAL,
86                             status.error_message());
87     }
88 
89     Env* env = Env::Default();
90     uint64 duration_ns = MilliToNano(req->opts().duration_ms());
91     uint64 deadline = GetCurrentTimeNanos() + duration_ns;
92     while (GetCurrentTimeNanos() < deadline) {
93       env->SleepForMicroseconds(EnvTime::kMillisToMicros);
94       if (ctx->IsCancelled()) {
95         return ::grpc::Status::CANCELLED;
96       }
97       if (TF_PREDICT_FALSE(IsStopped(req->session_id()))) {
98         mutex_lock lock(mutex_);
99         stop_signals_per_session_.erase(req->session_id());
100         break;
101       }
102     }
103 
104     status = CollectDataToRepository(*req, profiler.get(), response);
105     if (!status.ok()) {
106       return ::grpc::Status(::grpc::StatusCode::INTERNAL,
107                             status.error_message());
108     }
109 
110     return ::grpc::Status::OK;
111   }
112 
Terminate(::grpc::ServerContext * ctx,const TerminateRequest * req,TerminateResponse * response)113   ::grpc::Status Terminate(::grpc::ServerContext* ctx,
114                            const TerminateRequest* req,
115                            TerminateResponse* response) override {
116     mutex_lock lock(mutex_);
117     stop_signals_per_session_[req->session_id()] = true;
118     return ::grpc::Status::OK;
119   }
120 
121  private:
IsStopped(const std::string & session_id)122   bool IsStopped(const std::string& session_id) {
123     mutex_lock lock(mutex_);
124     auto it = stop_signals_per_session_.find(session_id);
125     return it != stop_signals_per_session_.end() && it->second;
126   }
127 
128   mutex mutex_;
129   absl::flat_hash_map<std::string, bool> stop_signals_per_session_
130       ABSL_GUARDED_BY(mutex_);
131 };
132 
133 }  // namespace
134 
CreateProfilerService()135 std::unique_ptr<grpc::ProfilerService::Service> CreateProfilerService() {
136   return std::make_unique<ProfilerServiceImpl>();
137 }
138 
139 }  // namespace profiler
140 }  // namespace tensorflow
141