1 /* 2 * Copyright 2020 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.rls; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 import static com.google.common.base.Preconditions.checkNotNull; 21 import static com.google.common.base.Preconditions.checkState; 22 23 import com.google.common.annotations.VisibleForTesting; 24 import com.google.common.base.MoreObjects; 25 import io.grpc.ChannelLogger.ChannelLogLevel; 26 import io.grpc.ConnectivityState; 27 import io.grpc.LoadBalancer; 28 import io.grpc.LoadBalancer.Helper; 29 import io.grpc.LoadBalancer.Subchannel; 30 import io.grpc.LoadBalancer.SubchannelPicker; 31 import io.grpc.LoadBalancerProvider; 32 import io.grpc.LoadBalancerRegistry; 33 import io.grpc.NameResolver.ConfigOrError; 34 import io.grpc.internal.ObjectPool; 35 import io.grpc.rls.ChildLoadBalancerHelper.ChildLoadBalancerHelperProvider; 36 import io.grpc.rls.RlsProtoData.RouteLookupConfig; 37 import io.grpc.util.ForwardingLoadBalancerHelper; 38 import java.util.ArrayList; 39 import java.util.Collections; 40 import java.util.HashMap; 41 import java.util.List; 42 import java.util.Map; 43 import java.util.Objects; 44 import java.util.concurrent.atomic.AtomicLong; 45 import javax.annotation.Nullable; 46 47 /** Configuration for RLS load balancing policy. */ 48 final class LbPolicyConfiguration { 49 50 private final RouteLookupConfig routeLookupConfig; 51 @Nullable 52 private final Map<String, ?> routeLookupChannelServiceConfig; 53 private final ChildLoadBalancingPolicy policy; 54 LbPolicyConfiguration( RouteLookupConfig routeLookupConfig, @Nullable Map<String, ?> routeLookupChannelServiceConfig, ChildLoadBalancingPolicy policy)55 LbPolicyConfiguration( 56 RouteLookupConfig routeLookupConfig, @Nullable Map<String, ?> routeLookupChannelServiceConfig, 57 ChildLoadBalancingPolicy policy) { 58 this.routeLookupConfig = checkNotNull(routeLookupConfig, "routeLookupConfig"); 59 this.routeLookupChannelServiceConfig = routeLookupChannelServiceConfig; 60 this.policy = checkNotNull(policy, "policy"); 61 } 62 getRouteLookupConfig()63 RouteLookupConfig getRouteLookupConfig() { 64 return routeLookupConfig; 65 } 66 67 @Nullable getRouteLookupChannelServiceConfig()68 Map<String, ?> getRouteLookupChannelServiceConfig() { 69 return routeLookupChannelServiceConfig; 70 } 71 getLoadBalancingPolicy()72 ChildLoadBalancingPolicy getLoadBalancingPolicy() { 73 return policy; 74 } 75 76 @Override equals(Object o)77 public boolean equals(Object o) { 78 if (this == o) { 79 return true; 80 } 81 if (o == null || getClass() != o.getClass()) { 82 return false; 83 } 84 LbPolicyConfiguration that = (LbPolicyConfiguration) o; 85 return Objects.equals(routeLookupConfig, that.routeLookupConfig) 86 && Objects.equals(routeLookupChannelServiceConfig, that.routeLookupChannelServiceConfig) 87 && Objects.equals(policy, that.policy); 88 } 89 90 @Override hashCode()91 public int hashCode() { 92 return Objects.hash(routeLookupConfig, routeLookupChannelServiceConfig, policy); 93 } 94 95 @Override toString()96 public String toString() { 97 return MoreObjects.toStringHelper(this) 98 .add("routeLookupConfig", routeLookupConfig) 99 .add("routeLookupChannelServiceConfig", routeLookupChannelServiceConfig) 100 .add("policy", policy) 101 .toString(); 102 } 103 104 /** ChildLoadBalancingPolicy is an elected child policy to delegate requests. */ 105 static final class ChildLoadBalancingPolicy { 106 107 private final Map<String, Object> effectiveRawChildPolicy; 108 private final LoadBalancerProvider effectiveLbProvider; 109 private final String targetFieldName; 110 111 @VisibleForTesting ChildLoadBalancingPolicy( String targetFieldName, Map<String, Object> effectiveRawChildPolicy, LoadBalancerProvider effectiveLbProvider)112 ChildLoadBalancingPolicy( 113 String targetFieldName, 114 Map<String, Object> effectiveRawChildPolicy, 115 LoadBalancerProvider effectiveLbProvider) { 116 checkArgument( 117 targetFieldName != null && !targetFieldName.isEmpty(), 118 "targetFieldName cannot be empty or null"); 119 this.targetFieldName = targetFieldName; 120 this.effectiveRawChildPolicy = 121 checkNotNull(effectiveRawChildPolicy, "effectiveRawChildPolicy"); 122 this.effectiveLbProvider = checkNotNull(effectiveLbProvider, "effectiveLbProvider"); 123 } 124 125 /** Creates ChildLoadBalancingPolicy. */ 126 @SuppressWarnings("unchecked") create( String childPolicyConfigTargetFieldName, List<Map<String, ?>> childPolicies)127 static ChildLoadBalancingPolicy create( 128 String childPolicyConfigTargetFieldName, List<Map<String, ?>> childPolicies) 129 throws InvalidChildPolicyConfigException { 130 Map<String, Object> effectiveChildPolicy = null; 131 LoadBalancerProvider effectiveLbProvider = null; 132 List<String> policyTried = new ArrayList<>(); 133 134 LoadBalancerRegistry lbRegistry = LoadBalancerRegistry.getDefaultRegistry(); 135 for (Map<String, ?> childPolicy : childPolicies) { 136 if (childPolicy.isEmpty()) { 137 continue; 138 } 139 if (childPolicy.size() != 1) { 140 throw 141 new InvalidChildPolicyConfigException( 142 "childPolicy should have exactly one loadbalancing policy"); 143 } 144 String policyName = childPolicy.keySet().iterator().next(); 145 LoadBalancerProvider provider = lbRegistry.getProvider(policyName); 146 if (provider != null) { 147 effectiveLbProvider = provider; 148 effectiveChildPolicy = Collections.unmodifiableMap(childPolicy); 149 break; 150 } 151 policyTried.add(policyName); 152 } 153 if (effectiveChildPolicy == null) { 154 throw 155 new InvalidChildPolicyConfigException( 156 String.format("no valid childPolicy found, policy tried: %s", policyTried)); 157 } 158 return 159 new ChildLoadBalancingPolicy( 160 childPolicyConfigTargetFieldName, 161 (Map<String, Object>) effectiveChildPolicy.values().iterator().next(), 162 effectiveLbProvider); 163 } 164 165 /** Creates a child load balancer config for given target from elected raw child policy. */ getEffectiveChildPolicy(String target)166 Map<String, ?> getEffectiveChildPolicy(String target) { 167 Map<String, Object> childPolicy = new HashMap<>(effectiveRawChildPolicy); 168 childPolicy.put(targetFieldName, target); 169 return childPolicy; 170 } 171 172 /** Returns the elected child {@link LoadBalancerProvider}. */ getEffectiveLbProvider()173 LoadBalancerProvider getEffectiveLbProvider() { 174 return effectiveLbProvider; 175 } 176 177 @Override equals(Object o)178 public boolean equals(Object o) { 179 if (this == o) { 180 return true; 181 } 182 if (o == null || getClass() != o.getClass()) { 183 return false; 184 } 185 ChildLoadBalancingPolicy that = (ChildLoadBalancingPolicy) o; 186 return Objects.equals(effectiveRawChildPolicy, that.effectiveRawChildPolicy) 187 && Objects.equals(effectiveLbProvider, that.effectiveLbProvider) 188 && Objects.equals(targetFieldName, that.targetFieldName); 189 } 190 191 @Override hashCode()192 public int hashCode() { 193 return Objects.hash(effectiveRawChildPolicy, effectiveLbProvider, targetFieldName); 194 } 195 196 @Override toString()197 public String toString() { 198 return MoreObjects.toStringHelper(this) 199 .add("effectiveRawChildPolicy", effectiveRawChildPolicy) 200 .add("effectiveLbProvider", effectiveLbProvider) 201 .add("childPolicyConfigTargetFieldName", targetFieldName) 202 .toString(); 203 } 204 } 205 206 /** Factory for {@link ChildPolicyWrapper}. */ 207 static final class RefCountedChildPolicyWrapperFactory { 208 // GuardedBy CachingRlsLbClient.lock 209 @VisibleForTesting 210 final Map<String /* target */, RefCountedChildPolicyWrapper> childPolicyMap = 211 new HashMap<>(); 212 213 private final ChildLoadBalancerHelperProvider childLbHelperProvider; 214 private final ChildLbStatusListener childLbStatusListener; 215 private final ChildLoadBalancingPolicy childPolicy; 216 private final ResolvedAddressFactory childLbResolvedAddressFactory; 217 RefCountedChildPolicyWrapperFactory( ChildLoadBalancingPolicy childPolicy, ResolvedAddressFactory childLbResolvedAddressFactory, ChildLoadBalancerHelperProvider childLbHelperProvider, ChildLbStatusListener childLbStatusListener)218 public RefCountedChildPolicyWrapperFactory( 219 ChildLoadBalancingPolicy childPolicy, 220 ResolvedAddressFactory childLbResolvedAddressFactory, 221 ChildLoadBalancerHelperProvider childLbHelperProvider, 222 ChildLbStatusListener childLbStatusListener) { 223 this.childPolicy = checkNotNull(childPolicy, "childPolicy"); 224 this.childLbResolvedAddressFactory = 225 checkNotNull(childLbResolvedAddressFactory, "childLbResolvedAddressFactory"); 226 this.childLbHelperProvider = checkNotNull(childLbHelperProvider, "childLbHelperProvider"); 227 this.childLbStatusListener = checkNotNull(childLbStatusListener, "childLbStatusListener"); 228 } 229 230 // GuardedBy CachingRlsLbClient.lock createOrGet(String target)231 ChildPolicyWrapper createOrGet(String target) { 232 // TODO(creamsoup) check if the target is valid or not 233 RefCountedChildPolicyWrapper pooledChildPolicyWrapper = childPolicyMap.get(target); 234 if (pooledChildPolicyWrapper == null) { 235 ChildPolicyWrapper childPolicyWrapper = new ChildPolicyWrapper( 236 target, childPolicy, childLbResolvedAddressFactory, childLbHelperProvider, 237 childLbStatusListener); 238 pooledChildPolicyWrapper = RefCountedChildPolicyWrapper.of(childPolicyWrapper); 239 childPolicyMap.put(target, pooledChildPolicyWrapper); 240 return pooledChildPolicyWrapper.getObject(); 241 } else { 242 ChildPolicyWrapper childPolicyWrapper = pooledChildPolicyWrapper.getObject(); 243 if (childPolicyWrapper.getPicker() != null) { 244 childPolicyWrapper.refreshState(); 245 } 246 return childPolicyWrapper; 247 } 248 } 249 250 // GuardedBy CachingRlsLbClient.lock createOrGet(List<String> targets)251 List<ChildPolicyWrapper> createOrGet(List<String> targets) { 252 List<ChildPolicyWrapper> retVal = new ArrayList<>(); 253 for (String target : targets) { 254 retVal.add(createOrGet(target)); 255 } 256 return retVal; 257 } 258 259 // GuardedBy CachingRlsLbClient.lock release(ChildPolicyWrapper childPolicyWrapper)260 void release(ChildPolicyWrapper childPolicyWrapper) { 261 checkNotNull(childPolicyWrapper, "childPolicyWrapper"); 262 String target = childPolicyWrapper.getTarget(); 263 RefCountedChildPolicyWrapper existing = childPolicyMap.get(target); 264 checkState(existing != null, "Cannot access already released object"); 265 existing.returnObject(childPolicyWrapper); 266 if (existing.isReleased()) { 267 childPolicyMap.remove(target); 268 } 269 } 270 } 271 272 /** 273 * ChildPolicyWrapper is a wrapper class for child load balancing policy with associated helper / 274 * utility classes to manage the child policy. 275 */ 276 static final class ChildPolicyWrapper { 277 278 private final String target; 279 private final ChildPolicyReportingHelper helper; 280 private final LoadBalancer lb; 281 private volatile SubchannelPicker picker; 282 private ConnectivityState state; 283 ChildPolicyWrapper( String target, ChildLoadBalancingPolicy childPolicy, final ResolvedAddressFactory childLbResolvedAddressFactory, ChildLoadBalancerHelperProvider childLbHelperProvider, ChildLbStatusListener childLbStatusListener)284 public ChildPolicyWrapper( 285 String target, 286 ChildLoadBalancingPolicy childPolicy, 287 final ResolvedAddressFactory childLbResolvedAddressFactory, 288 ChildLoadBalancerHelperProvider childLbHelperProvider, 289 ChildLbStatusListener childLbStatusListener) { 290 this.target = target; 291 this.helper = 292 new ChildPolicyReportingHelper(childLbHelperProvider, childLbStatusListener); 293 LoadBalancerProvider lbProvider = childPolicy.getEffectiveLbProvider(); 294 final ConfigOrError lbConfig = 295 lbProvider 296 .parseLoadBalancingPolicyConfig( 297 childPolicy.getEffectiveChildPolicy(target)); 298 this.lb = lbProvider.newLoadBalancer(helper); 299 helper.getChannelLogger().log( 300 ChannelLogLevel.DEBUG, "RLS child lb created. config: {0}", lbConfig.getConfig()); 301 helper.getSynchronizationContext().execute( 302 new Runnable() { 303 @Override 304 public void run() { 305 if (!lb.acceptResolvedAddresses( 306 childLbResolvedAddressFactory.create(lbConfig.getConfig()))) { 307 helper.refreshNameResolution(); 308 } 309 lb.requestConnection(); 310 } 311 }); 312 } 313 getTarget()314 String getTarget() { 315 return target; 316 } 317 getPicker()318 SubchannelPicker getPicker() { 319 return picker; 320 } 321 getHelper()322 ChildPolicyReportingHelper getHelper() { 323 return helper; 324 } 325 getState()326 public ConnectivityState getState() { 327 return state; 328 } 329 refreshState()330 void refreshState() { 331 helper.getSynchronizationContext().execute( 332 new Runnable() { 333 @Override 334 public void run() { 335 helper.updateBalancingState(state, picker); 336 } 337 } 338 ); 339 } 340 shutdown()341 void shutdown() { 342 helper.getSynchronizationContext().execute( 343 new Runnable() { 344 @Override 345 public void run() { 346 lb.shutdown(); 347 } 348 } 349 ); 350 } 351 352 @Override toString()353 public String toString() { 354 return MoreObjects.toStringHelper(this) 355 .add("target", target) 356 .add("picker", picker) 357 .add("state", state) 358 .toString(); 359 } 360 361 /** 362 * A delegating {@link io.grpc.LoadBalancer.Helper} maintains status of {@link 363 * ChildPolicyWrapper} when {@link Subchannel} status changed. This helper is used between child 364 * policy and parent load-balancer where each picker in child policy is governed by a governing 365 * picker (RlsPicker). The governing picker will be reported back to the parent load-balancer. 366 */ 367 final class ChildPolicyReportingHelper extends ForwardingLoadBalancerHelper { 368 369 private final ChildLoadBalancerHelper delegate; 370 private final ChildLbStatusListener listener; 371 ChildPolicyReportingHelper( ChildLoadBalancerHelperProvider childHelperProvider, ChildLbStatusListener listener)372 ChildPolicyReportingHelper( 373 ChildLoadBalancerHelperProvider childHelperProvider, 374 ChildLbStatusListener listener) { 375 checkNotNull(childHelperProvider, "childHelperProvider"); 376 this.delegate = childHelperProvider.forTarget(getTarget()); 377 this.listener = checkNotNull(listener, "listener"); 378 } 379 380 @Override delegate()381 protected Helper delegate() { 382 return delegate; 383 } 384 385 @Override updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker)386 public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) { 387 picker = newPicker; 388 state = newState; 389 super.updateBalancingState(newState, newPicker); 390 listener.onStatusChanged(newState); 391 } 392 } 393 } 394 395 /** Listener for child lb status change events. */ 396 interface ChildLbStatusListener { 397 398 /** Notifies when child lb status changes. */ onStatusChanged(ConnectivityState newState)399 void onStatusChanged(ConnectivityState newState); 400 } 401 402 private static final class RefCountedChildPolicyWrapper 403 implements ObjectPool<ChildPolicyWrapper> { 404 405 private final AtomicLong refCnt = new AtomicLong(); 406 @Nullable 407 private ChildPolicyWrapper childPolicyWrapper; 408 RefCountedChildPolicyWrapper(ChildPolicyWrapper childPolicyWrapper)409 private RefCountedChildPolicyWrapper(ChildPolicyWrapper childPolicyWrapper) { 410 this.childPolicyWrapper = checkNotNull(childPolicyWrapper, "childPolicyWrapper"); 411 } 412 413 @Override getObject()414 public ChildPolicyWrapper getObject() { 415 checkState(!isReleased(), "ChildPolicyWrapper is already released"); 416 refCnt.getAndIncrement(); 417 return childPolicyWrapper; 418 } 419 420 @Override 421 @Nullable returnObject(Object object)422 public ChildPolicyWrapper returnObject(Object object) { 423 checkState( 424 !isReleased(), 425 "cannot return already released ChildPolicyWrapper, this is possibly a bug."); 426 checkState( 427 childPolicyWrapper == object, 428 "returned object doesn't match the pooled childPolicyWrapper"); 429 long newCnt = refCnt.decrementAndGet(); 430 checkState(newCnt != -1, "Cannot return never pooled childPolicyWrapper"); 431 if (newCnt == 0) { 432 childPolicyWrapper.shutdown(); 433 childPolicyWrapper = null; 434 } 435 return null; 436 } 437 isReleased()438 boolean isReleased() { 439 return childPolicyWrapper == null; 440 } 441 of(ChildPolicyWrapper childPolicyWrapper)442 static RefCountedChildPolicyWrapper of(ChildPolicyWrapper childPolicyWrapper) { 443 return new RefCountedChildPolicyWrapper(childPolicyWrapper); 444 } 445 446 @Override toString()447 public String toString() { 448 return MoreObjects.toStringHelper(this) 449 .add("object", childPolicyWrapper) 450 .add("refCnt", refCnt.get()) 451 .toString(); 452 } 453 } 454 455 /** Exception thrown when attempting to parse child policy encountered parsing issue. */ 456 static final class InvalidChildPolicyConfigException extends Exception { 457 458 private static final long serialVersionUID = 0L; 459 InvalidChildPolicyConfigException(String message)460 InvalidChildPolicyConfigException(String message) { 461 super(message); 462 } 463 464 @Override fillInStackTrace()465 public synchronized Throwable fillInStackTrace() { 466 // no stack trace above this point 467 return this; 468 } 469 } 470 } 471