1 /* Copyright 2018 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15
16 #include "tensorflow/core/profiler/rpc/profiler_service_impl.h"
17
18 #include <memory>
19
20 #include "grpcpp/support/status.h"
21 #include "absl/container/flat_hash_map.h"
22 #include "absl/strings/str_replace.h"
23 #include "tensorflow/core/platform/env.h"
24 #include "tensorflow/core/platform/env_time.h"
25 #include "tensorflow/core/platform/errors.h"
26 #include "tensorflow/core/platform/logging.h"
27 #include "tensorflow/core/platform/macros.h"
28 #include "tensorflow/core/platform/mutex.h"
29 #include "tensorflow/core/platform/status.h"
30 #include "tensorflow/core/profiler/lib/profiler_session.h"
31 #include "tensorflow/core/profiler/profiler_service.grpc.pb.h"
32 #include "tensorflow/core/profiler/profiler_service.pb.h"
33 #include "tensorflow/core/profiler/protobuf/xplane.pb.h"
34 #include "tensorflow/core/profiler/utils/file_system_utils.h"
35 #include "tensorflow/core/profiler/utils/time_utils.h"
36 #include "tensorflow/core/profiler/utils/xplane_utils.h"
37
38 namespace tensorflow {
39 namespace profiler {
40 namespace {
41
42 const absl::string_view kXPlanePb = "xplane.pb";
43
44 // Collects data in XSpace format. The data is saved to a repository
45 // unconditionally.
CollectDataToRepository(const ProfileRequest & request,ProfilerSession * profiler,ProfileResponse * response)46 Status CollectDataToRepository(const ProfileRequest& request,
47 ProfilerSession* profiler,
48 ProfileResponse* response) {
49 response->set_empty_trace(true);
50 // Read the profile data into xspace.
51 XSpace xspace;
52 TF_RETURN_IF_ERROR(profiler->CollectData(&xspace));
53 VLOG(3) << "Collected XSpace to repository.";
54 response->set_empty_trace(IsEmpty(xspace));
55
56 std::string log_dir_path =
57 ProfilerJoinPath(request.repository_root(), request.session_id());
58 VLOG(1) << "Creating " << log_dir_path;
59 TF_RETURN_IF_ERROR(Env::Default()->RecursivelyCreateDir(log_dir_path));
60
61 std::string file_name = absl::StrCat(request.host_name(), ".", kXPlanePb);
62 // Windows file names do not support colons.
63 absl::StrReplaceAll({{":", "_"}}, &file_name);
64 // Dumps profile data to <repository_root>/<run>/<host>_<port>.<kXPlanePb>
65 std::string out_path = ProfilerJoinPath(log_dir_path, file_name);
66 LOG(INFO) << "Collecting XSpace to repository: " << out_path;
67
68 return WriteBinaryProto(Env::Default(), out_path, xspace);
69 }
70
71 class ProfilerServiceImpl : public grpc::ProfilerService::Service {
72 public:
Monitor(::grpc::ServerContext * ctx,const MonitorRequest * req,MonitorResponse * response)73 ::grpc::Status Monitor(::grpc::ServerContext* ctx, const MonitorRequest* req,
74 MonitorResponse* response) override {
75 return ::grpc::Status(::grpc::StatusCode::UNIMPLEMENTED, "unimplemented.");
76 }
77
Profile(::grpc::ServerContext * ctx,const ProfileRequest * req,ProfileResponse * response)78 ::grpc::Status Profile(::grpc::ServerContext* ctx, const ProfileRequest* req,
79 ProfileResponse* response) override {
80 VLOG(1) << "Received a profile request: " << req->DebugString();
81 std::unique_ptr<ProfilerSession> profiler =
82 ProfilerSession::Create(req->opts());
83 Status status = profiler->Status();
84 if (!status.ok()) {
85 return ::grpc::Status(::grpc::StatusCode::INTERNAL,
86 status.error_message());
87 }
88
89 Env* env = Env::Default();
90 uint64 duration_ns = MilliToNano(req->opts().duration_ms());
91 uint64 deadline = GetCurrentTimeNanos() + duration_ns;
92 while (GetCurrentTimeNanos() < deadline) {
93 env->SleepForMicroseconds(EnvTime::kMillisToMicros);
94 if (ctx->IsCancelled()) {
95 return ::grpc::Status::CANCELLED;
96 }
97 if (TF_PREDICT_FALSE(IsStopped(req->session_id()))) {
98 mutex_lock lock(mutex_);
99 stop_signals_per_session_.erase(req->session_id());
100 break;
101 }
102 }
103
104 status = CollectDataToRepository(*req, profiler.get(), response);
105 if (!status.ok()) {
106 return ::grpc::Status(::grpc::StatusCode::INTERNAL,
107 status.error_message());
108 }
109
110 return ::grpc::Status::OK;
111 }
112
Terminate(::grpc::ServerContext * ctx,const TerminateRequest * req,TerminateResponse * response)113 ::grpc::Status Terminate(::grpc::ServerContext* ctx,
114 const TerminateRequest* req,
115 TerminateResponse* response) override {
116 mutex_lock lock(mutex_);
117 stop_signals_per_session_[req->session_id()] = true;
118 return ::grpc::Status::OK;
119 }
120
121 private:
IsStopped(const std::string & session_id)122 bool IsStopped(const std::string& session_id) {
123 mutex_lock lock(mutex_);
124 auto it = stop_signals_per_session_.find(session_id);
125 return it != stop_signals_per_session_.end() && it->second;
126 }
127
128 mutex mutex_;
129 absl::flat_hash_map<std::string, bool> stop_signals_per_session_
130 ABSL_GUARDED_BY(mutex_);
131 };
132
133 } // namespace
134
CreateProfilerService()135 std::unique_ptr<grpc::ProfilerService::Service> CreateProfilerService() {
136 return std::make_unique<ProfilerServiceImpl>();
137 }
138
139 } // namespace profiler
140 } // namespace tensorflow
141