1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/transport/connectivity_state.h"
22
23 #include <string>
24
25 #include <grpc/support/log.h>
26
27 #include "src/core/lib/gprpp/debug_location.h"
28 #include "src/core/lib/gprpp/ref_counted_ptr.h"
29 #include "src/core/lib/iomgr/closure.h"
30 #include "src/core/lib/iomgr/error.h"
31 #include "src/core/lib/iomgr/exec_ctx.h"
32
33 namespace grpc_core {
34
35 TraceFlag grpc_connectivity_state_trace(false, "connectivity_state");
36
ConnectivityStateName(grpc_connectivity_state state)37 const char* ConnectivityStateName(grpc_connectivity_state state) {
38 switch (state) {
39 case GRPC_CHANNEL_IDLE:
40 return "IDLE";
41 case GRPC_CHANNEL_CONNECTING:
42 return "CONNECTING";
43 case GRPC_CHANNEL_READY:
44 return "READY";
45 case GRPC_CHANNEL_TRANSIENT_FAILURE:
46 return "TRANSIENT_FAILURE";
47 case GRPC_CHANNEL_SHUTDOWN:
48 return "SHUTDOWN";
49 }
50 GPR_UNREACHABLE_CODE(return "UNKNOWN");
51 }
52
53 //
54 // AsyncConnectivityStateWatcherInterface
55 //
56
57 // A fire-and-forget class to asynchronously deliver a connectivity
58 // state notification to a watcher.
59 class AsyncConnectivityStateWatcherInterface::Notifier {
60 public:
Notifier(RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher,grpc_connectivity_state state,const absl::Status & status,const std::shared_ptr<WorkSerializer> & work_serializer)61 Notifier(RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher,
62 grpc_connectivity_state state, const absl::Status& status,
63 const std::shared_ptr<WorkSerializer>& work_serializer)
64 : watcher_(std::move(watcher)), state_(state), status_(status) {
65 if (work_serializer != nullptr) {
66 work_serializer->Run(
67 [this]() { SendNotification(this, absl::OkStatus()); },
68 DEBUG_LOCATION);
69 } else {
70 GRPC_CLOSURE_INIT(&closure_, SendNotification, this,
71 grpc_schedule_on_exec_ctx);
72 ExecCtx::Run(DEBUG_LOCATION, &closure_, absl::OkStatus());
73 }
74 }
75
76 private:
SendNotification(void * arg,grpc_error_handle)77 static void SendNotification(void* arg, grpc_error_handle /*ignored*/) {
78 Notifier* self = static_cast<Notifier*>(arg);
79 if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
80 gpr_log(GPR_INFO, "watcher %p: delivering async notification for %s (%s)",
81 self->watcher_.get(), ConnectivityStateName(self->state_),
82 self->status_.ToString().c_str());
83 }
84 self->watcher_->OnConnectivityStateChange(self->state_, self->status_);
85 delete self;
86 }
87
88 RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher_;
89 const grpc_connectivity_state state_;
90 const absl::Status status_;
91 grpc_closure closure_;
92 };
93
Notify(grpc_connectivity_state state,const absl::Status & status)94 void AsyncConnectivityStateWatcherInterface::Notify(
95 grpc_connectivity_state state, const absl::Status& status) {
96 new Notifier(Ref(), state, status,
97 work_serializer_); // Deletes itself when done.
98 }
99
100 //
101 // ConnectivityStateTracker
102 //
103
~ConnectivityStateTracker()104 ConnectivityStateTracker::~ConnectivityStateTracker() {
105 grpc_connectivity_state current_state =
106 state_.load(std::memory_order_relaxed);
107 if (current_state == GRPC_CHANNEL_SHUTDOWN) return;
108 for (const auto& p : watchers_) {
109 if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
110 gpr_log(GPR_INFO,
111 "ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s",
112 name_, this, p.first, ConnectivityStateName(current_state),
113 ConnectivityStateName(GRPC_CHANNEL_SHUTDOWN));
114 }
115 p.second->Notify(GRPC_CHANNEL_SHUTDOWN, absl::Status());
116 }
117 }
118
AddWatcher(grpc_connectivity_state initial_state,OrphanablePtr<ConnectivityStateWatcherInterface> watcher)119 void ConnectivityStateTracker::AddWatcher(
120 grpc_connectivity_state initial_state,
121 OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
122 if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
123 gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: add watcher %p", name_,
124 this, watcher.get());
125 }
126 grpc_connectivity_state current_state =
127 state_.load(std::memory_order_relaxed);
128 if (initial_state != current_state) {
129 if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
130 gpr_log(GPR_INFO,
131 "ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s",
132 name_, this, watcher.get(), ConnectivityStateName(initial_state),
133 ConnectivityStateName(current_state));
134 }
135 watcher->Notify(current_state, status_);
136 }
137 // If we're in state SHUTDOWN, don't add the watcher, so that it will
138 // be orphaned immediately.
139 if (current_state != GRPC_CHANNEL_SHUTDOWN) {
140 watchers_.insert(std::make_pair(watcher.get(), std::move(watcher)));
141 }
142 }
143
RemoveWatcher(ConnectivityStateWatcherInterface * watcher)144 void ConnectivityStateTracker::RemoveWatcher(
145 ConnectivityStateWatcherInterface* watcher) {
146 if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
147 gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: remove watcher %p",
148 name_, this, watcher);
149 }
150 watchers_.erase(watcher);
151 }
152
SetState(grpc_connectivity_state state,const absl::Status & status,const char * reason)153 void ConnectivityStateTracker::SetState(grpc_connectivity_state state,
154 const absl::Status& status,
155 const char* reason) {
156 grpc_connectivity_state current_state =
157 state_.load(std::memory_order_relaxed);
158 if (state == current_state) return;
159 if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
160 gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: %s -> %s (%s, %s)",
161 name_, this, ConnectivityStateName(current_state),
162 ConnectivityStateName(state), reason, status.ToString().c_str());
163 }
164 state_.store(state, std::memory_order_relaxed);
165 status_ = status;
166 for (const auto& p : watchers_) {
167 if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
168 gpr_log(GPR_INFO,
169 "ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s",
170 name_, this, p.first, ConnectivityStateName(current_state),
171 ConnectivityStateName(state));
172 }
173 p.second->Notify(state, status);
174 }
175 // If the new state is SHUTDOWN, orphan all of the watchers. This
176 // avoids the need for the callers to explicitly cancel them.
177 if (state == GRPC_CHANNEL_SHUTDOWN) watchers_.clear();
178 }
179
state() const180 grpc_connectivity_state ConnectivityStateTracker::state() const {
181 grpc_connectivity_state state = state_.load(std::memory_order_relaxed);
182 if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
183 gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: get current state: %s",
184 name_, this, ConnectivityStateName(state));
185 }
186 return state;
187 }
188
189 } // namespace grpc_core
190