1 /* 2 * Copyright 2022 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.gcp.observability; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 21 import com.google.common.annotations.VisibleForTesting; 22 import com.google.common.collect.ImmutableSet; 23 import io.grpc.ClientInterceptor; 24 import io.grpc.InternalGlobalInterceptors; 25 import io.grpc.ManagedChannelProvider.ProviderNotFoundException; 26 import io.grpc.ServerInterceptor; 27 import io.grpc.ServerStreamTracer; 28 import io.grpc.census.InternalCensusStatsAccessor; 29 import io.grpc.census.InternalCensusTracingAccessor; 30 import io.grpc.census.internal.ObservabilityCensusConstants; 31 import io.grpc.gcp.observability.interceptors.ConditionalClientInterceptor; 32 import io.grpc.gcp.observability.interceptors.ConfigFilterHelper; 33 import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor; 34 import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor; 35 import io.grpc.gcp.observability.interceptors.LogHelper; 36 import io.grpc.gcp.observability.logging.GcpLogSink; 37 import io.grpc.gcp.observability.logging.Sink; 38 import io.grpc.gcp.observability.logging.TraceLoggingHelper; 39 import io.opencensus.common.Duration; 40 import io.opencensus.contrib.grpc.metrics.RpcViewConstants; 41 import io.opencensus.exporter.stats.stackdriver.StackdriverStatsConfiguration; 42 import io.opencensus.exporter.stats.stackdriver.StackdriverStatsExporter; 43 import io.opencensus.exporter.trace.stackdriver.StackdriverTraceConfiguration; 44 import io.opencensus.exporter.trace.stackdriver.StackdriverTraceExporter; 45 import io.opencensus.metrics.LabelKey; 46 import io.opencensus.metrics.LabelValue; 47 import io.opencensus.stats.Stats; 48 import io.opencensus.stats.ViewManager; 49 import io.opencensus.trace.AttributeValue; 50 import io.opencensus.trace.Tracing; 51 import io.opencensus.trace.config.TraceConfig; 52 import java.io.IOException; 53 import java.lang.management.ManagementFactory; 54 import java.net.InetAddress; 55 import java.net.UnknownHostException; 56 import java.security.SecureRandom; 57 import java.util.ArrayList; 58 import java.util.HashMap; 59 import java.util.Map; 60 import java.util.concurrent.TimeUnit; 61 import java.util.logging.Level; 62 import java.util.logging.Logger; 63 import java.util.stream.Collectors; 64 65 /** The main class for gRPC Google Cloud Platform Observability features. */ 66 public final class GcpObservability implements AutoCloseable { 67 68 private static final Logger logger = Logger.getLogger(GcpObservability.class.getName()); 69 private static final int METRICS_EXPORT_INTERVAL = 30; 70 71 static final String DEFAULT_METRIC_CUSTOM_TAG_KEY = "opencensus_task"; 72 @VisibleForTesting 73 static final ImmutableSet<String> SERVICES_TO_EXCLUDE = ImmutableSet.of( 74 "google.logging.v2.LoggingServiceV2", "google.monitoring.v3.MetricService", 75 "google.devtools.cloudtrace.v2.TraceService"); 76 77 private static GcpObservability instance = null; 78 private final Sink sink; 79 private final ObservabilityConfig config; 80 private final ArrayList<ClientInterceptor> clientInterceptors = new ArrayList<>(); 81 private final ArrayList<ServerInterceptor> serverInterceptors = new ArrayList<>(); 82 private final ArrayList<ServerStreamTracer.Factory> tracerFactories = new ArrayList<>(); 83 84 /** 85 * Initialize grpc-observability. 86 * 87 * @throws ProviderNotFoundException if no underlying channel/server provider is available. 88 */ grpcInit()89 public static synchronized GcpObservability grpcInit() throws IOException { 90 if (instance == null) { 91 ObservabilityConfigImpl observabilityConfig = ObservabilityConfigImpl.getInstance(); 92 TraceLoggingHelper traceLoggingHelper = new TraceLoggingHelper( 93 observabilityConfig.getProjectId()); 94 Sink sink = new GcpLogSink(observabilityConfig.getProjectId(), observabilityConfig, 95 SERVICES_TO_EXCLUDE, traceLoggingHelper); 96 LogHelper helper = new LogHelper(sink); 97 ConfigFilterHelper configFilterHelper = ConfigFilterHelper.getInstance(observabilityConfig); 98 instance = grpcInit(sink, observabilityConfig, 99 new InternalLoggingChannelInterceptor.FactoryImpl(helper, configFilterHelper), 100 new InternalLoggingServerInterceptor.FactoryImpl(helper, configFilterHelper)); 101 instance.registerStackDriverExporter(observabilityConfig.getProjectId(), 102 observabilityConfig.getCustomTags()); 103 } 104 return instance; 105 } 106 107 @VisibleForTesting grpcInit( Sink sink, ObservabilityConfig config, InternalLoggingChannelInterceptor.Factory channelInterceptorFactory, InternalLoggingServerInterceptor.Factory serverInterceptorFactory)108 static GcpObservability grpcInit( 109 Sink sink, 110 ObservabilityConfig config, 111 InternalLoggingChannelInterceptor.Factory channelInterceptorFactory, 112 InternalLoggingServerInterceptor.Factory serverInterceptorFactory) 113 throws IOException { 114 if (instance == null) { 115 instance = new GcpObservability(sink, config); 116 instance.setProducer(channelInterceptorFactory, serverInterceptorFactory); 117 } 118 return instance; 119 } 120 121 /** Un-initialize/shutdown grpc-observability. */ 122 @Override close()123 public void close() { 124 synchronized (GcpObservability.class) { 125 if (instance == null) { 126 throw new IllegalStateException("GcpObservability already closed!"); 127 } 128 sink.close(); 129 if (config.isEnableCloudMonitoring() || config.isEnableCloudTracing()) { 130 try { 131 // Sleeping before shutdown to ensure all metrics and traces are flushed 132 Thread.sleep( 133 TimeUnit.MILLISECONDS.convert(2 * METRICS_EXPORT_INTERVAL, TimeUnit.SECONDS)); 134 } catch (InterruptedException e) { 135 Thread.currentThread().interrupt(); 136 logger.log(Level.SEVERE, "Caught exception during sleep", e); 137 } 138 } 139 instance = null; 140 } 141 } 142 143 // TODO(dnvindhya): Remove <channel/server>InterceptorFactory and replace with respective 144 // interceptors setProducer( InternalLoggingChannelInterceptor.Factory channelInterceptorFactory, InternalLoggingServerInterceptor.Factory serverInterceptorFactory)145 private void setProducer( 146 InternalLoggingChannelInterceptor.Factory channelInterceptorFactory, 147 InternalLoggingServerInterceptor.Factory serverInterceptorFactory) { 148 if (config.isEnableCloudLogging()) { 149 clientInterceptors.add(channelInterceptorFactory.create()); 150 serverInterceptors.add(serverInterceptorFactory.create()); 151 } 152 if (config.isEnableCloudMonitoring()) { 153 clientInterceptors.add(getConditionalInterceptor( 154 InternalCensusStatsAccessor.getClientInterceptor(true, true, false, true))); 155 tracerFactories.add( 156 InternalCensusStatsAccessor.getServerStreamTracerFactory(true, true, false)); 157 } 158 if (config.isEnableCloudTracing()) { 159 clientInterceptors.add( 160 getConditionalInterceptor(InternalCensusTracingAccessor.getClientInterceptor())); 161 tracerFactories.add(InternalCensusTracingAccessor.getServerStreamTracerFactory()); 162 } 163 164 InternalGlobalInterceptors.setInterceptorsTracers( 165 clientInterceptors, serverInterceptors, tracerFactories); 166 } 167 getConditionalInterceptor(ClientInterceptor interceptor)168 static ConditionalClientInterceptor getConditionalInterceptor(ClientInterceptor interceptor) { 169 return new ConditionalClientInterceptor(interceptor, 170 (m, c) -> !SERVICES_TO_EXCLUDE.contains(m.getServiceName())); 171 } 172 registerObservabilityViews()173 private static void registerObservabilityViews() { 174 ViewManager viewManager = Stats.getViewManager(); 175 176 // client views 177 viewManager.registerView(RpcViewConstants.GRPC_CLIENT_COMPLETED_RPC_VIEW); 178 viewManager.registerView(RpcViewConstants.GRPC_CLIENT_STARTED_RPC_VIEW); 179 viewManager.registerView(RpcViewConstants.GRPC_CLIENT_ROUNDTRIP_LATENCY_VIEW); 180 viewManager.registerView(ObservabilityCensusConstants.GRPC_CLIENT_API_LATENCY_VIEW); 181 viewManager.registerView( 182 ObservabilityCensusConstants.GRPC_CLIENT_SENT_COMPRESSED_MESSAGE_BYTES_PER_RPC_VIEW); 183 viewManager.registerView( 184 ObservabilityCensusConstants.GRPC_CLIENT_RECEIVED_COMPRESSED_MESSAGE_BYTES_PER_RPC_VIEW); 185 186 // server views 187 viewManager.registerView(RpcViewConstants.GRPC_SERVER_COMPLETED_RPC_VIEW); 188 viewManager.registerView(RpcViewConstants.GRPC_SERVER_STARTED_RPC_VIEW); 189 viewManager.registerView(RpcViewConstants.GRPC_SERVER_SERVER_LATENCY_VIEW); 190 viewManager.registerView( 191 ObservabilityCensusConstants.GRPC_SERVER_SENT_COMPRESSED_MESSAGE_BYTES_PER_RPC_VIEW); 192 viewManager.registerView( 193 ObservabilityCensusConstants.GRPC_SERVER_RECEIVED_COMPRESSED_MESSAGE_BYTES_PER_RPC_VIEW); 194 } 195 196 @VisibleForTesting registerStackDriverExporter(String projectId, Map<String, String> customTags)197 void registerStackDriverExporter(String projectId, Map<String, String> customTags) 198 throws IOException { 199 if (config.isEnableCloudMonitoring()) { 200 registerObservabilityViews(); 201 StackdriverStatsConfiguration.Builder statsConfigurationBuilder = 202 StackdriverStatsConfiguration.builder(); 203 if (projectId != null) { 204 statsConfigurationBuilder.setProjectId(projectId); 205 } 206 Map<LabelKey, LabelValue> constantLabels = new HashMap<>(); 207 constantLabels.put( 208 LabelKey.create(DEFAULT_METRIC_CUSTOM_TAG_KEY, DEFAULT_METRIC_CUSTOM_TAG_KEY), 209 LabelValue.create(generateDefaultMetricTagValue())); 210 if (customTags != null) { 211 for (Map.Entry<String, String> mapEntry : customTags.entrySet()) { 212 constantLabels.putIfAbsent(LabelKey.create(mapEntry.getKey(), mapEntry.getKey()), 213 LabelValue.create(mapEntry.getValue())); 214 } 215 } 216 statsConfigurationBuilder.setConstantLabels(constantLabels); 217 statsConfigurationBuilder.setExportInterval(Duration.create(METRICS_EXPORT_INTERVAL, 0)); 218 StackdriverStatsExporter.createAndRegister(statsConfigurationBuilder.build()); 219 } 220 221 if (config.isEnableCloudTracing()) { 222 TraceConfig traceConfig = Tracing.getTraceConfig(); 223 traceConfig.updateActiveTraceParams( 224 traceConfig.getActiveTraceParams().toBuilder().setSampler(config.getSampler()).build()); 225 StackdriverTraceConfiguration.Builder traceConfigurationBuilder = 226 StackdriverTraceConfiguration.builder(); 227 if (projectId != null) { 228 traceConfigurationBuilder.setProjectId(projectId); 229 } 230 if (customTags != null) { 231 Map<String, AttributeValue> fixedAttributes = customTags.entrySet().stream() 232 .collect(Collectors.toMap(e -> e.getKey(), 233 e -> AttributeValue.stringAttributeValue(e.getValue()))); 234 traceConfigurationBuilder.setFixedAttributes(fixedAttributes); 235 } 236 StackdriverTraceExporter.createAndRegister(traceConfigurationBuilder.build()); 237 } 238 } 239 generateDefaultMetricTagValue()240 private static String generateDefaultMetricTagValue() { 241 final String jvmName = ManagementFactory.getRuntimeMXBean().getName(); 242 if (jvmName.indexOf('@') < 1) { 243 String hostname = "localhost"; 244 try { 245 hostname = InetAddress.getLocalHost().getHostName(); 246 } catch (UnknownHostException e) { 247 logger.log(Level.INFO, "Unable to get the hostname.", e); 248 } 249 return "java-" + new SecureRandom().nextInt() + "@" + hostname; 250 } 251 return "java-" + jvmName; 252 } 253 GcpObservability( Sink sink, ObservabilityConfig config)254 private GcpObservability( 255 Sink sink, 256 ObservabilityConfig config) { 257 this.sink = checkNotNull(sink); 258 this.config = checkNotNull(config); 259 } 260 } 261