xref: /aosp_15_r20/external/grpc-grpc/src/cpp/ext/gcp/observability_logging_sink.cc (revision cc02d7e222339f7a4f6ba5f422e6413f4bd931f2)
1 //
2 //
3 // Copyright 2022 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/cpp/ext/gcp/observability_logging_sink.h"
22 
23 #include <algorithm>
24 #include <map>
25 #include <utility>
26 
27 #include "absl/numeric/int128.h"
28 #include "absl/strings/escaping.h"
29 #include "absl/strings/match.h"
30 #include "absl/strings/str_format.h"
31 #include "absl/types/optional.h"
32 #include "google/api/monitored_resource.pb.h"
33 #include "google/logging/v2/log_entry.pb.h"
34 #include "google/logging/v2/logging.grpc.pb.h"
35 #include "google/logging/v2/logging.pb.h"
36 #include "google/protobuf/text_format.h"
37 
38 #include <grpc/impl/channel_arg_names.h>
39 #include <grpc/support/log.h>
40 #include <grpc/support/time.h>
41 #include <grpcpp/grpcpp.h>
42 #include <grpcpp/security/credentials.h>
43 #include <grpcpp/support/channel_arguments.h>
44 #include <grpcpp/support/status.h>
45 
46 #include "src/core/lib/event_engine/default_event_engine.h"
47 #include "src/core/lib/gprpp/env.h"
48 #include "src/core/lib/gprpp/time.h"
49 #include "src/core/lib/gprpp/uuid_v4.h"
50 #include "src/core/lib/json/json.h"
51 #include "src/cpp/ext/filters/census/open_census_call_tracer.h"
52 
53 // IWYU pragma: no_include "google/protobuf/struct.pb.h"
54 // IWYU pragma: no_include "google/protobuf/timestamp.pb.h"
55 
56 namespace grpc {
57 namespace internal {
58 
59 using grpc_core::LoggingSink;
60 
ObservabilityLoggingSink(GcpObservabilityConfig::CloudLogging logging_config,std::string project_id,std::map<std::string,std::string> labels)61 ObservabilityLoggingSink::ObservabilityLoggingSink(
62     GcpObservabilityConfig::CloudLogging logging_config, std::string project_id,
63     std::map<std::string, std::string> labels)
64     : project_id_(std::move(project_id)),
65       labels_(labels.begin(), labels.end()) {
66   for (auto& client_rpc_event_config : logging_config.client_rpc_events) {
67     client_configs_.emplace_back(client_rpc_event_config);
68   }
69   for (auto& server_rpc_event_config : logging_config.server_rpc_events) {
70     server_configs_.emplace_back(server_rpc_event_config);
71   }
72   absl::optional<std::string> authority_env =
73       grpc_core::GetEnv("GOOGLE_CLOUD_CPP_LOGGING_SERVICE_V2_ENDPOINT");
74   absl::optional<std::string> endpoint_env =
75       grpc_core::GetEnv("GOOGLE_CLOUD_CPP_LOGGING_SERVICE_V2_ENDPOINT");
76   if (authority_env.has_value() && !authority_env->empty()) {
77     authority_ = std::move(*endpoint_env);
78   }
79 }
80 
FindMatch(bool is_client,absl::string_view service,absl::string_view method)81 LoggingSink::Config ObservabilityLoggingSink::FindMatch(
82     bool is_client, absl::string_view service, absl::string_view method) {
83   const auto& configs = is_client ? client_configs_ : server_configs_;
84   if (service.empty() || method.empty()) {
85     return LoggingSink::Config();
86   }
87   for (const auto& config : configs) {
88     for (const auto& config_method : config.parsed_methods) {
89       if ((config_method.service == "*") ||
90           ((service == config_method.service) &&
91            ((config_method.method == "*") ||
92             (method == config_method.method)))) {
93         if (config.exclude) {
94           return LoggingSink::Config();
95         }
96         return LoggingSink::Config(config.max_metadata_bytes,
97                                    config.max_message_bytes);
98       }
99     }
100   }
101   return LoggingSink::Config();
102 }
103 
104 namespace {
105 
EventTypeToString(LoggingSink::Entry::EventType type)106 std::string EventTypeToString(LoggingSink::Entry::EventType type) {
107   switch (type) {
108     case LoggingSink::Entry::EventType::kClientHeader:
109       return "CLIENT_HEADER";
110     case LoggingSink::Entry::EventType::kServerHeader:
111       return "SERVER_HEADER";
112     case LoggingSink::Entry::EventType::kClientMessage:
113       return "CLIENT_MESSAGE";
114     case LoggingSink::Entry::EventType::kServerMessage:
115       return "SERVER_MESSAGE";
116     case LoggingSink::Entry::EventType::kClientHalfClose:
117       return "CLIENT_HALF_CLOSE";
118     case LoggingSink::Entry::EventType::kServerTrailer:
119       return "SERVER_TRAILER";
120     case LoggingSink::Entry::EventType::kCancel:
121       return "CANCEL";
122     case LoggingSink::Entry::EventType::kUnknown:
123     default:
124       return "EVENT_TYPE_UNKNOWN";
125   }
126 }
127 
LoggerToString(LoggingSink::Entry::Logger type)128 std::string LoggerToString(LoggingSink::Entry::Logger type) {
129   switch (type) {
130     case LoggingSink::Entry::Logger::kClient:
131       return "CLIENT";
132     case LoggingSink::Entry::Logger::kServer:
133       return "SERVER";
134     case LoggingSink::Entry::Logger::kUnknown:
135     default:
136       return "LOGGER_UNKNOWN";
137   }
138 }
139 
PayloadToJsonStructProto(LoggingSink::Entry::Payload payload,::google::protobuf::Struct * payload_proto)140 void PayloadToJsonStructProto(LoggingSink::Entry::Payload payload,
141                               ::google::protobuf::Struct* payload_proto) {
142   grpc_core::Json::Object payload_json;
143   if (!payload.metadata.empty()) {
144     auto* metadata_proto =
145         (*payload_proto->mutable_fields())["metadata"].mutable_struct_value();
146     for (auto& metadata : payload.metadata) {
147       if (absl::EndsWith(metadata.first, "-bin")) {
148         (*metadata_proto->mutable_fields())[metadata.first].set_string_value(
149             absl::WebSafeBase64Escape(metadata.second));
150       } else {
151         (*metadata_proto->mutable_fields())[metadata.first].set_string_value(
152             std::move(metadata.second));
153       }
154     }
155   }
156   if (payload.timeout != grpc_core::Duration::Zero()) {
157     (*payload_proto->mutable_fields())["timeout"].set_string_value(
158         payload.timeout.ToJsonString());
159   }
160   if (payload.status_code != 0) {
161     (*payload_proto->mutable_fields())["statusCode"].set_number_value(
162         payload.status_code);
163   }
164   if (!payload.status_message.empty()) {
165     (*payload_proto->mutable_fields())["statusMessage"].set_string_value(
166         std::move(payload.status_message));
167   }
168   if (!payload.status_details.empty()) {
169     (*payload_proto->mutable_fields())["statusDetails"].set_string_value(
170         absl::Base64Escape(payload.status_details));
171   }
172   if (payload.message_length != 0) {
173     (*payload_proto->mutable_fields())["messageLength"].set_number_value(
174         payload.message_length);
175   }
176   if (!payload.message.empty()) {
177     (*payload_proto->mutable_fields())["message"].set_string_value(
178         absl::Base64Escape(payload.message));
179   }
180 }
181 
AddressTypeToString(LoggingSink::Entry::Address::Type type)182 std::string AddressTypeToString(LoggingSink::Entry::Address::Type type) {
183   switch (type) {
184     case LoggingSink::Entry::Address::Type::kIpv4:
185       return "TYPE_IPV4";
186     case LoggingSink::Entry::Address::Type::kIpv6:
187       return "TYPE_IPV6";
188     case LoggingSink::Entry::Address::Type::kUnix:
189       return "TYPE_UNIX";
190     case LoggingSink::Entry::Address::Type::kUnknown:
191     default:
192       return "TYPE_UNKNOWN";
193   }
194 }
195 
PeerToJsonStructProto(LoggingSink::Entry::Address peer,::google::protobuf::Struct * peer_json)196 void PeerToJsonStructProto(LoggingSink::Entry::Address peer,
197                            ::google::protobuf::Struct* peer_json) {
198   (*peer_json->mutable_fields())["type"].set_string_value(
199       AddressTypeToString(peer.type));
200   if (peer.type != LoggingSink::Entry::Address::Type::kUnknown) {
201     (*peer_json->mutable_fields())["address"].set_string_value(
202         std::move(peer.address));
203     (*peer_json->mutable_fields())["ipPort"].set_number_value(peer.ip_port);
204   }
205 }
206 
207 }  // namespace
208 
EntryToJsonStructProto(LoggingSink::Entry entry,::google::protobuf::Struct * json_payload)209 void EntryToJsonStructProto(LoggingSink::Entry entry,
210                             ::google::protobuf::Struct* json_payload) {
211   (*json_payload->mutable_fields())["callId"].set_string_value(
212       grpc_core::GenerateUUIDv4(absl::Uint128High64(entry.call_id),
213                                 absl::Uint128Low64(entry.call_id)));
214   (*json_payload->mutable_fields())["sequenceId"].set_number_value(
215       entry.sequence_id);
216   (*json_payload->mutable_fields())["type"].set_string_value(
217       EventTypeToString(entry.type));
218   (*json_payload->mutable_fields())["logger"].set_string_value(
219       LoggerToString(entry.logger));
220   PayloadToJsonStructProto(
221       std::move(entry.payload),
222       (*json_payload->mutable_fields())["payload"].mutable_struct_value());
223   if (entry.payload_truncated) {
224     (*json_payload->mutable_fields())["payloadTruncated"].set_bool_value(
225         entry.payload_truncated);
226   }
227   PeerToJsonStructProto(
228       std::move(entry.peer),
229       (*json_payload->mutable_fields())["peer"].mutable_struct_value());
230   (*json_payload->mutable_fields())["authority"].set_string_value(
231       std::move(entry.authority));
232   (*json_payload->mutable_fields())["serviceName"].set_string_value(
233       std::move(entry.service_name));
234   (*json_payload->mutable_fields())["methodName"].set_string_value(
235       std::move(entry.method_name));
236 }
237 
238 namespace {
239 
EstimateEntrySize(const LoggingSink::Entry & entry)240 uint64_t EstimateEntrySize(const LoggingSink::Entry& entry) {
241   uint64_t size = sizeof(entry);
242   for (const auto& pair : entry.payload.metadata) {
243     size += pair.first.size() + pair.second.size();
244   }
245   size += entry.payload.status_message.size();
246   size += entry.payload.status_details.size();
247   size += entry.payload.message.size();
248   size += entry.authority.size();
249   size += entry.service_name.size();
250   size += entry.method_name.size();
251   return size;
252 }
253 
254 }  // namespace
255 
LogEntry(Entry entry)256 void ObservabilityLoggingSink::LogEntry(Entry entry) {
257   auto entry_size = EstimateEntrySize(entry);
258   grpc_core::MutexLock lock(&mu_);
259   if (sink_closed_) return;
260   entries_.push_back(std::move(entry));
261   entries_memory_footprint_ += entry_size;
262   MaybeTriggerFlushLocked();
263 }
264 
RegisterEnvironmentResource(const EnvironmentAutoDetect::ResourceType * resource)265 void ObservabilityLoggingSink::RegisterEnvironmentResource(
266     const EnvironmentAutoDetect::ResourceType* resource) {
267   grpc_core::MutexLock lock(&mu_);
268   resource_ = resource;
269   MaybeTriggerFlushLocked();
270 }
271 
FlushAndClose()272 void ObservabilityLoggingSink::FlushAndClose() {
273   grpc_core::MutexLock lock(&mu_);
274   sink_closed_ = true;
275   if (entries_.empty()) return;
276   MaybeTriggerFlushLocked();
277   sink_flushed_after_close_.Wait(&mu_);
278 }
279 
Flush()280 void ObservabilityLoggingSink::Flush() {
281   std::vector<Entry> entries;
282   google::logging::v2::LoggingServiceV2::StubInterface* stub = nullptr;
283   const EnvironmentAutoDetect::ResourceType* resource = nullptr;
284   {
285     grpc_core::MutexLock lock(&mu_);
286     if (flush_in_progress_) {
287       return;
288     }
289     flush_in_progress_ = true;
290     flush_timer_in_progress_ = false;
291     flush_triggered_ = false;
292     if (stub_ == nullptr) {
293       std::string endpoint;
294       absl::optional<std::string> endpoint_env =
295           grpc_core::GetEnv("GOOGLE_CLOUD_CPP_LOGGING_SERVICE_V2_ENDPOINT");
296       if (endpoint_env.has_value() && !endpoint_env->empty()) {
297         endpoint = std::move(*endpoint_env);
298       } else {
299         endpoint = "logging.googleapis.com";
300       }
301       ChannelArguments args;
302       // Disable observability for RPCs on this channel
303       args.SetInt(GRPC_ARG_ENABLE_OBSERVABILITY, 0);
304       // Set keepalive time to 24 hrs to effectively disable keepalive ping, but
305       // still enable KEEPALIVE_TIMEOUT to get the TCP_USER_TIMEOUT effect.
306       args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS,
307                   24 * 60 * 60 * 1000 /* 24 hours */);
308       args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, 20 * 1000 /* 20 seconds */);
309       stub_ = google::logging::v2::LoggingServiceV2::NewStub(
310           CreateCustomChannel(endpoint, GoogleDefaultCredentials(), args));
311     }
312     stub = stub_.get();
313     entries = std::move(entries_);
314     entries_memory_footprint_ = 0;
315     resource = resource_;
316   }
317   FlushEntriesHelper(stub, std::move(entries), resource);
318 }
319 
FlushEntriesHelper(google::logging::v2::LoggingServiceV2::StubInterface * stub,std::vector<Entry> entries,const EnvironmentAutoDetect::ResourceType * resource)320 void ObservabilityLoggingSink::FlushEntriesHelper(
321     google::logging::v2::LoggingServiceV2::StubInterface* stub,
322     std::vector<Entry> entries,
323     const EnvironmentAutoDetect::ResourceType* resource) {
324   if (entries.empty()) {
325     return;
326   }
327   struct CallContext {
328     ClientContext context;
329     google::logging::v2::WriteLogEntriesRequest request;
330     google::logging::v2::WriteLogEntriesResponse response;
331   };
332   CallContext* call = new CallContext;
333   call->context.set_authority(authority_);
334   call->context.set_deadline(
335       (grpc_core::Timestamp::Now() + grpc_core::Duration::Seconds(30))
336           .as_timespec(GPR_CLOCK_MONOTONIC));
337   call->request.set_log_name(
338       absl::StrFormat("projects/%s/logs/"
339                       "microservices.googleapis.com%%2Fobservability%%2fgrpc",
340                       project_id_));
341   (*call->request.mutable_labels()).insert(labels_.begin(), labels_.end());
342   // Set the proper resource type and labels.
343   call->request.mutable_resource()->set_type(resource->resource_type);
344   call->request.mutable_resource()->mutable_labels()->insert(
345       resource->labels.begin(), resource->labels.end());
346   for (auto& entry : entries) {
347     auto* proto_entry = call->request.add_entries();
348     gpr_timespec timespec = entry.timestamp.as_timespec(GPR_CLOCK_REALTIME);
349     proto_entry->mutable_timestamp()->set_seconds(timespec.tv_sec);
350     proto_entry->mutable_timestamp()->set_nanos(timespec.tv_nsec);
351     // Add tracing details
352     proto_entry->set_span_id(entry.span_id);
353     proto_entry->set_trace(
354         absl::StrFormat("projects/%s/traces/%s", project_id_, entry.trace_id));
355     proto_entry->set_trace_sampled(entry.is_sampled);
356     // TODO(yashykt): Check if we need to fill receive timestamp
357     EntryToJsonStructProto(std::move(entry),
358                            proto_entry->mutable_json_payload());
359   }
360   stub->async()->WriteLogEntries(
361       &(call->context), &(call->request), &(call->response),
362       [this, call](Status status) {
363         if (!status.ok()) {
364           gpr_log(
365               GPR_ERROR,
366               "GCP Observability Logging Error %d: %s. Dumping log entries.",
367               status.error_code(), status.error_message().c_str());
368           for (auto& entry : call->request.entries()) {
369             std::string output;
370             ::google::protobuf::TextFormat::PrintToString(entry.json_payload(),
371                                                           &output);
372             gpr_log(
373                 GPR_INFO, "Log Entry recorded at time: %s : %s",
374                 grpc_core::Timestamp::FromTimespecRoundUp(
375                     gpr_timespec{entry.timestamp().seconds(),
376                                  entry.timestamp().nanos(), GPR_CLOCK_REALTIME})
377                     .ToString()
378                     .c_str(),
379                 output.c_str());
380           }
381         }
382         delete call;
383         grpc_core::MutexLock lock(&mu_);
384         flush_in_progress_ = false;
385         if (sink_closed_ && entries_.empty()) {
386           sink_flushed_after_close_.SignalAll();
387         } else {
388           MaybeTriggerFlushLocked();
389         }
390       });
391 }
392 
MaybeTriggerFlush()393 void ObservabilityLoggingSink::MaybeTriggerFlush() {
394   grpc_core::MutexLock lock(&mu_);
395   return MaybeTriggerFlushLocked();
396 }
397 
MaybeTriggerFlushLocked()398 void ObservabilityLoggingSink::MaybeTriggerFlushLocked() {
399   constexpr int kMaxEntriesBeforeDump = 100000;
400   constexpr int kMaxMemoryFootprintBeforeDump = 10 * 1024 * 1024;
401   constexpr int kMinEntriesBeforeFlush = 1000;
402   constexpr int kMinMemoryFootprintBeforeFlush = 1 * 1024 * 1024;
403   // Use this opportunity to fetch environment resource if not fetched already
404   if (resource_ == nullptr && !registered_env_fetch_notification_) {
405     auto& env_autodetect = EnvironmentAutoDetect::Get();
406     resource_ = env_autodetect.resource();
407     event_engine_ = grpc_event_engine::experimental::GetDefaultEventEngine();
408     if (resource_ == nullptr) {
409       registered_env_fetch_notification_ = true;
410       env_autodetect.NotifyOnDone([this]() {
411         RegisterEnvironmentResource(EnvironmentAutoDetect::Get().resource());
412       });
413     }
414   }
415   if (entries_.empty()) return;
416   if (entries_.size() > kMaxEntriesBeforeDump ||
417       entries_memory_footprint_ > kMaxMemoryFootprintBeforeDump) {
418     // Buffer limits have been reached. Dump entries with gpr_log
419     gpr_log(GPR_INFO, "Buffer limit reached. Dumping log entries.");
420     for (auto& entry : entries_) {
421       google::protobuf::Struct proto;
422       std::string timestamp = entry.timestamp.ToString();
423       EntryToJsonStructProto(std::move(entry), &proto);
424       std::string output;
425       ::google::protobuf::TextFormat::PrintToString(proto, &output);
426       gpr_log(GPR_INFO, "Log Entry recorded at time: %s : %s",
427               timestamp.c_str(), output.c_str());
428     }
429     entries_.clear();
430     entries_memory_footprint_ = 0;
431   } else if (resource_ != nullptr && !flush_in_progress_) {
432     // Environment resource has been detected. Trigger flush if conditions
433     // suffice.
434     if ((entries_.size() >= kMinEntriesBeforeFlush ||
435          entries_memory_footprint_ >= kMinMemoryFootprintBeforeFlush ||
436          sink_closed_) &&
437         !flush_triggered_) {
438       // It is fine even if there were a flush with a timer in progress. What is
439       // important is that a flush is triggered.
440       flush_triggered_ = true;
441       event_engine_->Run([this]() { Flush(); });
442     } else if (!flush_timer_in_progress_) {
443       flush_timer_in_progress_ = true;
444       event_engine_->RunAfter(grpc_core::Duration::Seconds(1),
445                               [this]() { Flush(); });
446     }
447   }
448 }
449 
Configuration(const GcpObservabilityConfig::CloudLogging::RpcEventConfiguration & rpc_event_config)450 ObservabilityLoggingSink::Configuration::Configuration(
451     const GcpObservabilityConfig::CloudLogging::RpcEventConfiguration&
452         rpc_event_config)
453     : exclude(rpc_event_config.exclude),
454       max_metadata_bytes(rpc_event_config.max_metadata_bytes),
455       max_message_bytes(rpc_event_config.max_message_bytes) {
456   for (auto& parsed_method : rpc_event_config.parsed_methods) {
457     parsed_methods.emplace_back(ParsedMethod{
458         std::string(parsed_method.service), std::string(parsed_method.method)});
459   }
460 }
461 
462 }  // namespace internal
463 }  // namespace grpc
464