1 /* 2 * Copyright 2019 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.xds.orca; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 21 import com.github.xds.data.orca.v3.OrcaLoadReport; 22 import com.google.common.annotations.VisibleForTesting; 23 import io.grpc.CallOptions; 24 import io.grpc.ClientStreamTracer; 25 import io.grpc.ClientStreamTracer.StreamInfo; 26 import io.grpc.ExperimentalApi; 27 import io.grpc.LoadBalancer; 28 import io.grpc.Metadata; 29 import io.grpc.internal.ForwardingClientStreamTracer; 30 import io.grpc.protobuf.ProtoUtils; 31 import io.grpc.services.InternalCallMetricRecorder; 32 import io.grpc.services.MetricReport; 33 import java.util.ArrayList; 34 import java.util.List; 35 36 /** 37 * Utility class that provides method for {@link LoadBalancer} to install listeners to receive 38 * per-request backend cost metrics in the format of Open Request Cost Aggregation (ORCA). 39 */ 40 @ExperimentalApi("https://github.com/grpc/grpc-java/issues/9128") 41 public abstract class OrcaPerRequestUtil { 42 private static final ClientStreamTracer NOOP_CLIENT_STREAM_TRACER = new ClientStreamTracer() {}; 43 private static final ClientStreamTracer.Factory NOOP_CLIENT_STREAM_TRACER_FACTORY = 44 new ClientStreamTracer.Factory() { 45 @Override 46 public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) { 47 return NOOP_CLIENT_STREAM_TRACER; 48 } 49 }; 50 private static final OrcaPerRequestUtil DEFAULT_INSTANCE = 51 new OrcaPerRequestUtil() { 52 @Override 53 public ClientStreamTracer.Factory newOrcaClientStreamTracerFactory( 54 OrcaPerRequestReportListener listener) { 55 return newOrcaClientStreamTracerFactory(NOOP_CLIENT_STREAM_TRACER_FACTORY, listener); 56 } 57 58 @Override 59 public ClientStreamTracer.Factory newOrcaClientStreamTracerFactory( 60 ClientStreamTracer.Factory delegate, OrcaPerRequestReportListener listener) { 61 return new OrcaReportingTracerFactory(delegate, listener); 62 } 63 }; 64 65 /** 66 * Gets an {@code OrcaPerRequestUtil} instance that provides actual implementation of 67 * {@link #newOrcaClientStreamTracerFactory}. 68 */ getInstance()69 public static OrcaPerRequestUtil getInstance() { 70 return DEFAULT_INSTANCE; 71 } 72 73 /** 74 * Creates a new {@link io.grpc.ClientStreamTracer.Factory} with provided {@link 75 * OrcaPerRequestReportListener} installed to receive callback when a per-request ORCA report is 76 * received. 77 * 78 * <p>Example usages for leaf level policy (e.g., WRR policy) 79 * 80 * <pre> 81 * {@code 82 * class WrrPicker extends SubchannelPicker { 83 * 84 * public PickResult pickSubchannel(PickSubchannelArgs args) { 85 * Subchannel subchannel = ... // WRR picking logic 86 * return PickResult.withSubchannel( 87 * subchannel, 88 * OrcaPerRequestReportUtil.getInstance().newOrcaClientStreamTracerFactory(listener)); 89 * } 90 * } 91 * } 92 * </pre> 93 * 94 * @param listener contains the callback to be invoked when a per-request ORCA report is received. 95 */ newOrcaClientStreamTracerFactory( OrcaPerRequestReportListener listener)96 public abstract ClientStreamTracer.Factory newOrcaClientStreamTracerFactory( 97 OrcaPerRequestReportListener listener); 98 99 /** 100 * Creates a new {@link io.grpc.ClientStreamTracer.Factory} with provided {@link 101 * OrcaPerRequestReportListener} installed to receive callback when a per-request ORCA report is 102 * received. 103 * 104 * <p>Example usages: 105 * 106 * <ul> 107 * <li> Delegating policy (e.g., xDS) 108 * <pre> 109 * {@code 110 * class XdsPicker extends SubchannelPicker { 111 * 112 * public PickResult pickSubchannel(PickSubchannelArgs args) { 113 * SubchannelPicker perLocalityPicker = ... // locality picking logic 114 * Result result = perLocalityPicker.pickSubchannel(args); 115 * return PickResult.withSubchannel( 116 * result.getSubchannel(), 117 * OrcaPerRequestReportUtil.getInstance().newOrcaClientTracerFactory( 118 * result.getStreamTracerFactory(), listener)); 119 * 120 * } 121 * } 122 * } 123 * </pre> 124 * </li> 125 * <li> Delegating policy with additional tracing logic 126 * <pre> 127 * {@code 128 * class WrappingPicker extends SubchannelPicker { 129 * 130 * public PickResult pickSubchannel(PickSubchannelArgs args) { 131 * Result result = delegate.pickSubchannel(args); 132 * return PickResult.withSubchannel( 133 * result.getSubchannel(), 134 * new ClientStreamTracer.Factory() { 135 * public ClientStreamTracer newClientStreamTracer( 136 * StreamInfo info, Metadata metadata) { 137 * ClientStreamTracer.Factory orcaTracerFactory = 138 * OrcaPerRequestReportUtil.getInstance().newOrcaClientStreamTracerFactory( 139 * result.getStreamTracerFactory(), listener); 140 * 141 * // Wrap the tracer from the delegate factory if you need to trace the 142 * // stream for your own. 143 * final ClientStreamTracer orcaTracer = 144 * orcaTracerFactory.newClientStreamTracer(info, metadata); 145 * 146 * return ForwardingClientStreamTracer() { 147 * protected ClientStreamTracer delegate() { 148 * return orcaTracer; 149 * } 150 * 151 * public void inboundMessage(int seqNo) { 152 * // Handle this event. 153 * ... 154 * } 155 * }; 156 * } 157 * }); 158 * } 159 * } 160 * } 161 * </pre> 162 * </li> 163 * </ul> 164 * 165 * @param delegate the delegate factory to produce other client stream tracing. 166 * @param listener contains the callback to be invoked when a per-request ORCA report is received. 167 */ newOrcaClientStreamTracerFactory( ClientStreamTracer.Factory delegate, OrcaPerRequestReportListener listener)168 public abstract ClientStreamTracer.Factory newOrcaClientStreamTracerFactory( 169 ClientStreamTracer.Factory delegate, OrcaPerRequestReportListener listener); 170 171 /** 172 * The listener interface for receiving per-request ORCA reports from backends. The class that is 173 * interested in processing backend cost metrics implements this interface, and the object created 174 * with that class is registered with a component, using methods in {@link OrcaPerRequestUtil}. 175 * When an ORCA report is received, that object's {@code onLoadReport} method is invoked. 176 */ 177 public interface OrcaPerRequestReportListener { 178 179 /** 180 * Invoked when a per-request ORCA report is received. 181 * 182 * <p>Note this callback will be invoked from the network thread as the RPC finishes, 183 * implementations should not block. 184 * 185 * @param report load report in the format of grpc {@link MetricReport}. 186 */ onLoadReport(MetricReport report)187 void onLoadReport(MetricReport report); 188 } 189 190 /** 191 * An {@link OrcaReportingTracerFactory} wraps a delegated {@link ClientStreamTracer.Factory} with 192 * additional functionality to produce {@link ClientStreamTracer} instances that extract 193 * per-request ORCA reports and push to registered listeners for calls they trace. 194 */ 195 @VisibleForTesting 196 static final class OrcaReportingTracerFactory extends 197 ClientStreamTracer.Factory { 198 199 @VisibleForTesting 200 static final Metadata.Key<OrcaLoadReport> ORCA_ENDPOINT_LOAD_METRICS_KEY = 201 Metadata.Key.of( 202 "endpoint-load-metrics-bin", 203 ProtoUtils.metadataMarshaller(OrcaLoadReport.getDefaultInstance())); 204 205 private static final CallOptions.Key<OrcaReportBroker> ORCA_REPORT_BROKER_KEY = 206 CallOptions.Key.create("internal-orca-report-broker"); 207 private final ClientStreamTracer.Factory delegate; 208 private final OrcaPerRequestReportListener listener; 209 OrcaReportingTracerFactory( ClientStreamTracer.Factory delegate, OrcaPerRequestReportListener listener)210 OrcaReportingTracerFactory( 211 ClientStreamTracer.Factory delegate, OrcaPerRequestReportListener listener) { 212 this.delegate = checkNotNull(delegate, "delegate"); 213 this.listener = checkNotNull(listener, "listener"); 214 } 215 216 @Override newClientStreamTracer(StreamInfo info, Metadata headers)217 public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) { 218 OrcaReportBroker broker = info.getCallOptions().getOption(ORCA_REPORT_BROKER_KEY); 219 boolean augmented = false; 220 if (broker == null) { 221 broker = new OrcaReportBroker(); 222 info = 223 info.toBuilder() 224 .setCallOptions(info.getCallOptions().withOption(ORCA_REPORT_BROKER_KEY, broker)) 225 .build(); 226 augmented = true; 227 } 228 broker.addListener(listener); 229 ClientStreamTracer tracer = delegate.newClientStreamTracer(info, headers); 230 if (augmented) { 231 final ClientStreamTracer currTracer = tracer; 232 final OrcaReportBroker currBroker = broker; 233 // The actual tracer that performs ORCA report deserialization. 234 tracer = 235 new ForwardingClientStreamTracer() { 236 @Override 237 protected ClientStreamTracer delegate() { 238 return currTracer; 239 } 240 241 @Override 242 public void inboundTrailers(Metadata trailers) { 243 OrcaLoadReport report = trailers.get(ORCA_ENDPOINT_LOAD_METRICS_KEY); 244 if (report != null) { 245 currBroker.onReport(report); 246 } 247 delegate().inboundTrailers(trailers); 248 } 249 }; 250 } 251 return tracer; 252 } 253 } 254 fromOrcaLoadReport(OrcaLoadReport loadReport)255 static MetricReport fromOrcaLoadReport(OrcaLoadReport loadReport) { 256 return InternalCallMetricRecorder.createMetricReport(loadReport.getCpuUtilization(), 257 loadReport.getApplicationUtilization(), loadReport.getMemUtilization(), 258 loadReport.getRpsFractional(), loadReport.getEps(), loadReport.getRequestCostMap(), 259 loadReport.getUtilizationMap()); 260 } 261 262 /** 263 * A container class to hold registered {@link OrcaPerRequestReportListener}s and invoke all of 264 * them when an {@link OrcaLoadReport} is received. 265 */ 266 private static final class OrcaReportBroker { 267 268 private final List<OrcaPerRequestReportListener> listeners = new ArrayList<>(); 269 addListener(OrcaPerRequestReportListener listener)270 void addListener(OrcaPerRequestReportListener listener) { 271 listeners.add(listener); 272 } 273 onReport(OrcaLoadReport report)274 void onReport(OrcaLoadReport report) { 275 MetricReport metricReport = fromOrcaLoadReport(report); 276 for (OrcaPerRequestReportListener listener : listeners) { 277 listener.onLoadReport(metricReport); 278 } 279 } 280 } 281 } 282