1 /*
2  * Copyright 2022 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.gcp.observability;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 
21 import com.google.common.annotations.VisibleForTesting;
22 import com.google.common.collect.ImmutableSet;
23 import io.grpc.ClientInterceptor;
24 import io.grpc.InternalGlobalInterceptors;
25 import io.grpc.ManagedChannelProvider.ProviderNotFoundException;
26 import io.grpc.ServerInterceptor;
27 import io.grpc.ServerStreamTracer;
28 import io.grpc.census.InternalCensusStatsAccessor;
29 import io.grpc.census.InternalCensusTracingAccessor;
30 import io.grpc.census.internal.ObservabilityCensusConstants;
31 import io.grpc.gcp.observability.interceptors.ConditionalClientInterceptor;
32 import io.grpc.gcp.observability.interceptors.ConfigFilterHelper;
33 import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor;
34 import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor;
35 import io.grpc.gcp.observability.interceptors.LogHelper;
36 import io.grpc.gcp.observability.logging.GcpLogSink;
37 import io.grpc.gcp.observability.logging.Sink;
38 import io.grpc.gcp.observability.logging.TraceLoggingHelper;
39 import io.opencensus.common.Duration;
40 import io.opencensus.contrib.grpc.metrics.RpcViewConstants;
41 import io.opencensus.exporter.stats.stackdriver.StackdriverStatsConfiguration;
42 import io.opencensus.exporter.stats.stackdriver.StackdriverStatsExporter;
43 import io.opencensus.exporter.trace.stackdriver.StackdriverTraceConfiguration;
44 import io.opencensus.exporter.trace.stackdriver.StackdriverTraceExporter;
45 import io.opencensus.metrics.LabelKey;
46 import io.opencensus.metrics.LabelValue;
47 import io.opencensus.stats.Stats;
48 import io.opencensus.stats.ViewManager;
49 import io.opencensus.trace.AttributeValue;
50 import io.opencensus.trace.Tracing;
51 import io.opencensus.trace.config.TraceConfig;
52 import java.io.IOException;
53 import java.lang.management.ManagementFactory;
54 import java.net.InetAddress;
55 import java.net.UnknownHostException;
56 import java.security.SecureRandom;
57 import java.util.ArrayList;
58 import java.util.HashMap;
59 import java.util.Map;
60 import java.util.concurrent.TimeUnit;
61 import java.util.logging.Level;
62 import java.util.logging.Logger;
63 import java.util.stream.Collectors;
64 
65 /** The main class for gRPC Google Cloud Platform Observability features. */
66 public final class GcpObservability implements AutoCloseable {
67 
68   private static final Logger logger = Logger.getLogger(GcpObservability.class.getName());
69   private static final int METRICS_EXPORT_INTERVAL = 30;
70 
71   static final String DEFAULT_METRIC_CUSTOM_TAG_KEY = "opencensus_task";
72   @VisibleForTesting
73   static final ImmutableSet<String> SERVICES_TO_EXCLUDE = ImmutableSet.of(
74       "google.logging.v2.LoggingServiceV2", "google.monitoring.v3.MetricService",
75       "google.devtools.cloudtrace.v2.TraceService");
76 
77   private static GcpObservability instance = null;
78   private final Sink sink;
79   private final ObservabilityConfig config;
80   private final ArrayList<ClientInterceptor> clientInterceptors = new ArrayList<>();
81   private final ArrayList<ServerInterceptor> serverInterceptors = new ArrayList<>();
82   private final ArrayList<ServerStreamTracer.Factory> tracerFactories = new ArrayList<>();
83 
84   /**
85    * Initialize grpc-observability.
86    *
87    * @throws ProviderNotFoundException if no underlying channel/server provider is available.
88    */
grpcInit()89   public static synchronized GcpObservability grpcInit() throws IOException {
90     if (instance == null) {
91       ObservabilityConfigImpl observabilityConfig = ObservabilityConfigImpl.getInstance();
92       TraceLoggingHelper traceLoggingHelper = new TraceLoggingHelper(
93           observabilityConfig.getProjectId());
94       Sink sink = new GcpLogSink(observabilityConfig.getProjectId(), observabilityConfig,
95           SERVICES_TO_EXCLUDE, traceLoggingHelper);
96       LogHelper helper = new LogHelper(sink);
97       ConfigFilterHelper configFilterHelper = ConfigFilterHelper.getInstance(observabilityConfig);
98       instance = grpcInit(sink, observabilityConfig,
99           new InternalLoggingChannelInterceptor.FactoryImpl(helper, configFilterHelper),
100           new InternalLoggingServerInterceptor.FactoryImpl(helper, configFilterHelper));
101       instance.registerStackDriverExporter(observabilityConfig.getProjectId(),
102           observabilityConfig.getCustomTags());
103     }
104     return instance;
105   }
106 
107   @VisibleForTesting
grpcInit( Sink sink, ObservabilityConfig config, InternalLoggingChannelInterceptor.Factory channelInterceptorFactory, InternalLoggingServerInterceptor.Factory serverInterceptorFactory)108   static GcpObservability grpcInit(
109       Sink sink,
110       ObservabilityConfig config,
111       InternalLoggingChannelInterceptor.Factory channelInterceptorFactory,
112       InternalLoggingServerInterceptor.Factory serverInterceptorFactory)
113       throws IOException {
114     if (instance == null) {
115       instance = new GcpObservability(sink, config);
116       instance.setProducer(channelInterceptorFactory, serverInterceptorFactory);
117     }
118     return instance;
119   }
120 
121   /** Un-initialize/shutdown grpc-observability. */
122   @Override
close()123   public void close() {
124     synchronized (GcpObservability.class) {
125       if (instance == null) {
126         throw new IllegalStateException("GcpObservability already closed!");
127       }
128       sink.close();
129       if (config.isEnableCloudMonitoring() || config.isEnableCloudTracing()) {
130         try {
131           // Sleeping before shutdown to ensure all metrics and traces are flushed
132           Thread.sleep(
133               TimeUnit.MILLISECONDS.convert(2 * METRICS_EXPORT_INTERVAL, TimeUnit.SECONDS));
134         } catch (InterruptedException e) {
135           Thread.currentThread().interrupt();
136           logger.log(Level.SEVERE, "Caught exception during sleep", e);
137         }
138       }
139       instance = null;
140     }
141   }
142 
143   // TODO(dnvindhya): Remove <channel/server>InterceptorFactory and replace with respective
144   // interceptors
setProducer( InternalLoggingChannelInterceptor.Factory channelInterceptorFactory, InternalLoggingServerInterceptor.Factory serverInterceptorFactory)145   private void setProducer(
146       InternalLoggingChannelInterceptor.Factory channelInterceptorFactory,
147       InternalLoggingServerInterceptor.Factory serverInterceptorFactory) {
148     if (config.isEnableCloudLogging()) {
149       clientInterceptors.add(channelInterceptorFactory.create());
150       serverInterceptors.add(serverInterceptorFactory.create());
151     }
152     if (config.isEnableCloudMonitoring()) {
153       clientInterceptors.add(getConditionalInterceptor(
154           InternalCensusStatsAccessor.getClientInterceptor(true, true, false, true)));
155       tracerFactories.add(
156           InternalCensusStatsAccessor.getServerStreamTracerFactory(true, true, false));
157     }
158     if (config.isEnableCloudTracing()) {
159       clientInterceptors.add(
160           getConditionalInterceptor(InternalCensusTracingAccessor.getClientInterceptor()));
161       tracerFactories.add(InternalCensusTracingAccessor.getServerStreamTracerFactory());
162     }
163 
164     InternalGlobalInterceptors.setInterceptorsTracers(
165         clientInterceptors, serverInterceptors, tracerFactories);
166   }
167 
getConditionalInterceptor(ClientInterceptor interceptor)168   static ConditionalClientInterceptor getConditionalInterceptor(ClientInterceptor interceptor) {
169     return new ConditionalClientInterceptor(interceptor,
170         (m, c) -> !SERVICES_TO_EXCLUDE.contains(m.getServiceName()));
171   }
172 
registerObservabilityViews()173   private static void registerObservabilityViews() {
174     ViewManager viewManager = Stats.getViewManager();
175 
176     // client views
177     viewManager.registerView(RpcViewConstants.GRPC_CLIENT_COMPLETED_RPC_VIEW);
178     viewManager.registerView(RpcViewConstants.GRPC_CLIENT_STARTED_RPC_VIEW);
179     viewManager.registerView(RpcViewConstants.GRPC_CLIENT_ROUNDTRIP_LATENCY_VIEW);
180     viewManager.registerView(ObservabilityCensusConstants.GRPC_CLIENT_API_LATENCY_VIEW);
181     viewManager.registerView(
182         ObservabilityCensusConstants.GRPC_CLIENT_SENT_COMPRESSED_MESSAGE_BYTES_PER_RPC_VIEW);
183     viewManager.registerView(
184         ObservabilityCensusConstants.GRPC_CLIENT_RECEIVED_COMPRESSED_MESSAGE_BYTES_PER_RPC_VIEW);
185 
186     // server views
187     viewManager.registerView(RpcViewConstants.GRPC_SERVER_COMPLETED_RPC_VIEW);
188     viewManager.registerView(RpcViewConstants.GRPC_SERVER_STARTED_RPC_VIEW);
189     viewManager.registerView(RpcViewConstants.GRPC_SERVER_SERVER_LATENCY_VIEW);
190     viewManager.registerView(
191         ObservabilityCensusConstants.GRPC_SERVER_SENT_COMPRESSED_MESSAGE_BYTES_PER_RPC_VIEW);
192     viewManager.registerView(
193         ObservabilityCensusConstants.GRPC_SERVER_RECEIVED_COMPRESSED_MESSAGE_BYTES_PER_RPC_VIEW);
194   }
195 
196   @VisibleForTesting
registerStackDriverExporter(String projectId, Map<String, String> customTags)197   void registerStackDriverExporter(String projectId, Map<String, String> customTags)
198       throws IOException {
199     if (config.isEnableCloudMonitoring()) {
200       registerObservabilityViews();
201       StackdriverStatsConfiguration.Builder statsConfigurationBuilder =
202           StackdriverStatsConfiguration.builder();
203       if (projectId != null) {
204         statsConfigurationBuilder.setProjectId(projectId);
205       }
206       Map<LabelKey, LabelValue> constantLabels = new HashMap<>();
207       constantLabels.put(
208           LabelKey.create(DEFAULT_METRIC_CUSTOM_TAG_KEY, DEFAULT_METRIC_CUSTOM_TAG_KEY),
209           LabelValue.create(generateDefaultMetricTagValue()));
210       if (customTags != null) {
211         for (Map.Entry<String, String> mapEntry : customTags.entrySet()) {
212           constantLabels.putIfAbsent(LabelKey.create(mapEntry.getKey(), mapEntry.getKey()),
213               LabelValue.create(mapEntry.getValue()));
214         }
215       }
216       statsConfigurationBuilder.setConstantLabels(constantLabels);
217       statsConfigurationBuilder.setExportInterval(Duration.create(METRICS_EXPORT_INTERVAL, 0));
218       StackdriverStatsExporter.createAndRegister(statsConfigurationBuilder.build());
219     }
220 
221     if (config.isEnableCloudTracing()) {
222       TraceConfig traceConfig = Tracing.getTraceConfig();
223       traceConfig.updateActiveTraceParams(
224           traceConfig.getActiveTraceParams().toBuilder().setSampler(config.getSampler()).build());
225       StackdriverTraceConfiguration.Builder traceConfigurationBuilder =
226           StackdriverTraceConfiguration.builder();
227       if (projectId != null) {
228         traceConfigurationBuilder.setProjectId(projectId);
229       }
230       if (customTags != null) {
231         Map<String, AttributeValue> fixedAttributes = customTags.entrySet().stream()
232             .collect(Collectors.toMap(e -> e.getKey(),
233                 e -> AttributeValue.stringAttributeValue(e.getValue())));
234         traceConfigurationBuilder.setFixedAttributes(fixedAttributes);
235       }
236       StackdriverTraceExporter.createAndRegister(traceConfigurationBuilder.build());
237     }
238   }
239 
generateDefaultMetricTagValue()240   private static String generateDefaultMetricTagValue() {
241     final String jvmName = ManagementFactory.getRuntimeMXBean().getName();
242     if (jvmName.indexOf('@') < 1) {
243       String hostname = "localhost";
244       try {
245         hostname = InetAddress.getLocalHost().getHostName();
246       } catch (UnknownHostException e) {
247         logger.log(Level.INFO, "Unable to get the hostname.", e);
248       }
249       return "java-" + new SecureRandom().nextInt() + "@" + hostname;
250     }
251     return "java-" + jvmName;
252   }
253 
GcpObservability( Sink sink, ObservabilityConfig config)254   private GcpObservability(
255       Sink sink,
256       ObservabilityConfig config) {
257     this.sink = checkNotNull(sink);
258     this.config = checkNotNull(config);
259   }
260 }
261