1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/transport/connectivity_state.h"
22 
23 #include <string>
24 
25 #include <grpc/support/log.h>
26 
27 #include "src/core/lib/gprpp/debug_location.h"
28 #include "src/core/lib/gprpp/ref_counted_ptr.h"
29 #include "src/core/lib/iomgr/closure.h"
30 #include "src/core/lib/iomgr/error.h"
31 #include "src/core/lib/iomgr/exec_ctx.h"
32 
33 namespace grpc_core {
34 
35 TraceFlag grpc_connectivity_state_trace(false, "connectivity_state");
36 
ConnectivityStateName(grpc_connectivity_state state)37 const char* ConnectivityStateName(grpc_connectivity_state state) {
38   switch (state) {
39     case GRPC_CHANNEL_IDLE:
40       return "IDLE";
41     case GRPC_CHANNEL_CONNECTING:
42       return "CONNECTING";
43     case GRPC_CHANNEL_READY:
44       return "READY";
45     case GRPC_CHANNEL_TRANSIENT_FAILURE:
46       return "TRANSIENT_FAILURE";
47     case GRPC_CHANNEL_SHUTDOWN:
48       return "SHUTDOWN";
49   }
50   GPR_UNREACHABLE_CODE(return "UNKNOWN");
51 }
52 
53 //
54 // AsyncConnectivityStateWatcherInterface
55 //
56 
57 // A fire-and-forget class to asynchronously deliver a connectivity
58 // state notification to a watcher.
59 class AsyncConnectivityStateWatcherInterface::Notifier {
60  public:
Notifier(RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher,grpc_connectivity_state state,const absl::Status & status,const std::shared_ptr<WorkSerializer> & work_serializer)61   Notifier(RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher,
62            grpc_connectivity_state state, const absl::Status& status,
63            const std::shared_ptr<WorkSerializer>& work_serializer)
64       : watcher_(std::move(watcher)), state_(state), status_(status) {
65     if (work_serializer != nullptr) {
66       work_serializer->Run(
67           [this]() { SendNotification(this, absl::OkStatus()); },
68           DEBUG_LOCATION);
69     } else {
70       GRPC_CLOSURE_INIT(&closure_, SendNotification, this,
71                         grpc_schedule_on_exec_ctx);
72       ExecCtx::Run(DEBUG_LOCATION, &closure_, absl::OkStatus());
73     }
74   }
75 
76  private:
SendNotification(void * arg,grpc_error_handle)77   static void SendNotification(void* arg, grpc_error_handle /*ignored*/) {
78     Notifier* self = static_cast<Notifier*>(arg);
79     if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
80       gpr_log(GPR_INFO, "watcher %p: delivering async notification for %s (%s)",
81               self->watcher_.get(), ConnectivityStateName(self->state_),
82               self->status_.ToString().c_str());
83     }
84     self->watcher_->OnConnectivityStateChange(self->state_, self->status_);
85     delete self;
86   }
87 
88   RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher_;
89   const grpc_connectivity_state state_;
90   const absl::Status status_;
91   grpc_closure closure_;
92 };
93 
Notify(grpc_connectivity_state state,const absl::Status & status)94 void AsyncConnectivityStateWatcherInterface::Notify(
95     grpc_connectivity_state state, const absl::Status& status) {
96   new Notifier(Ref(), state, status,
97                work_serializer_);  // Deletes itself when done.
98 }
99 
100 //
101 // ConnectivityStateTracker
102 //
103 
~ConnectivityStateTracker()104 ConnectivityStateTracker::~ConnectivityStateTracker() {
105   grpc_connectivity_state current_state =
106       state_.load(std::memory_order_relaxed);
107   if (current_state == GRPC_CHANNEL_SHUTDOWN) return;
108   for (const auto& p : watchers_) {
109     if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
110       gpr_log(GPR_INFO,
111               "ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s",
112               name_, this, p.first, ConnectivityStateName(current_state),
113               ConnectivityStateName(GRPC_CHANNEL_SHUTDOWN));
114     }
115     p.second->Notify(GRPC_CHANNEL_SHUTDOWN, absl::Status());
116   }
117 }
118 
AddWatcher(grpc_connectivity_state initial_state,OrphanablePtr<ConnectivityStateWatcherInterface> watcher)119 void ConnectivityStateTracker::AddWatcher(
120     grpc_connectivity_state initial_state,
121     OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
122   if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
123     gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: add watcher %p", name_,
124             this, watcher.get());
125   }
126   grpc_connectivity_state current_state =
127       state_.load(std::memory_order_relaxed);
128   if (initial_state != current_state) {
129     if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
130       gpr_log(GPR_INFO,
131               "ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s",
132               name_, this, watcher.get(), ConnectivityStateName(initial_state),
133               ConnectivityStateName(current_state));
134     }
135     watcher->Notify(current_state, status_);
136   }
137   // If we're in state SHUTDOWN, don't add the watcher, so that it will
138   // be orphaned immediately.
139   if (current_state != GRPC_CHANNEL_SHUTDOWN) {
140     watchers_.insert(std::make_pair(watcher.get(), std::move(watcher)));
141   }
142 }
143 
RemoveWatcher(ConnectivityStateWatcherInterface * watcher)144 void ConnectivityStateTracker::RemoveWatcher(
145     ConnectivityStateWatcherInterface* watcher) {
146   if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
147     gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: remove watcher %p",
148             name_, this, watcher);
149   }
150   watchers_.erase(watcher);
151 }
152 
SetState(grpc_connectivity_state state,const absl::Status & status,const char * reason)153 void ConnectivityStateTracker::SetState(grpc_connectivity_state state,
154                                         const absl::Status& status,
155                                         const char* reason) {
156   grpc_connectivity_state current_state =
157       state_.load(std::memory_order_relaxed);
158   if (state == current_state) return;
159   if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
160     gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: %s -> %s (%s, %s)",
161             name_, this, ConnectivityStateName(current_state),
162             ConnectivityStateName(state), reason, status.ToString().c_str());
163   }
164   state_.store(state, std::memory_order_relaxed);
165   status_ = status;
166   for (const auto& p : watchers_) {
167     if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
168       gpr_log(GPR_INFO,
169               "ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s",
170               name_, this, p.first, ConnectivityStateName(current_state),
171               ConnectivityStateName(state));
172     }
173     p.second->Notify(state, status);
174   }
175   // If the new state is SHUTDOWN, orphan all of the watchers.  This
176   // avoids the need for the callers to explicitly cancel them.
177   if (state == GRPC_CHANNEL_SHUTDOWN) watchers_.clear();
178 }
179 
state() const180 grpc_connectivity_state ConnectivityStateTracker::state() const {
181   grpc_connectivity_state state = state_.load(std::memory_order_relaxed);
182   if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
183     gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: get current state: %s",
184             name_, this, ConnectivityStateName(state));
185   }
186   return state;
187 }
188 
189 }  // namespace grpc_core
190