刘耀文

刘耀文

java开发者
github

The automatic refresh mechanism of the Spring Cloud [Boot] configuration center

Implementation Mechanism:#

Principle of Automatic Refresh of Bean Properties:#

In Spring 2, a custom scope was introduced, which added the scope annotation and interface in addition to singleton and prototype, to enhance the storage lifecycle of beans. The related interfaces and classes are:

ConfigurableBeanFactory.registerScope, 
CustomScopeConfigurer, 
org.springframework.aop.scope.ScopedProxyFactoryBean, org.springframework.web.context.request.RequestScope,
org.springframework.web.context.request.SessionScope

The basic principle of implementation is:

  • During package scanning, when the @scope interface is recognized, the bean definition is modified to ScopedProxyFactoryBean, specifically in:
	protected Set<BeanDefinitionHolder> doScan(String... basePackages) {
		Assert.notEmpty(basePackages, "At least one base package must be specified");
		Set<BeanDefinitionHolder> beanDefinitions = new LinkedHashSet<>();
		for (String basePackage : basePackages) {
			Set<BeanDefinition> candidates = findCandidateComponents(basePackage);
			for (BeanDefinition candidate : candidates) {
				ScopeMetadata scopeMetadata = this.scopeMetadataResolver.resolveScopeMetadata(candidate);
				candidate.setScope(scopeMetadata.getScopeName());
				String beanName = this.beanNameGenerator.generateBeanName(candidate, this.registry);
				if (candidate instanceof AbstractBeanDefinition abstractBeanDefinition) {
					postProcessBeanDefinition(abstractBeanDefinition, beanName);
				}
				if (candidate instanceof AnnotatedBeanDefinition annotatedBeanDefinition) {
					AnnotationConfigUtils.processCommonDefinitionAnnotations(annotatedBeanDefinition);
				}
                // It is here that all classes with scope annotations are found, and the definition is changed to ScopedProxyFactoryBean
				if (checkCandidate(beanName, candidate)) {
					BeanDefinitionHolder definitionHolder = new BeanDefinitionHolder(candidate, beanName);
					definitionHolder =
							AnnotationConfigUtils.applyScopedProxyMode(scopeMetadata, definitionHolder, this.registry);
					beanDefinitions.add(definitionHolder);
					registerBeanDefinition(definitionHolder, this.registry);
				}
			}
		}
		return beanDefinitions;
	}

Following the code, we find:

    public static BeanDefinitionHolder createScopedProxy(BeanDefinitionHolder definition, BeanDefinitionRegistry registry, boolean proxyTargetClass) {
        String originalBeanName = definition.getBeanName();
        BeanDefinition targetDefinition = definition.getBeanDefinition();
        String targetBeanName = getTargetBeanName(originalBeanName);
        // It can be seen that the bean definition of ScopedProxyFactoryBean is created here
        RootBeanDefinition proxyDefinition = new RootBeanDefinition(ScopedProxyFactoryBean.class);
        // The original class is set here, which is the class annotated with scope
        proxyDefinition.setDecoratedDefinition(new BeanDefinitionHolder(targetDefinition, targetBeanName));
        proxyDefinition.setOriginatingBeanDefinition(targetDefinition);
        proxyDefinition.setSource(definition.getSource());
        proxyDefinition.setRole(targetDefinition.getRole());
        proxyDefinition.getPropertyValues().add("targetBeanName", targetBeanName);
        if (proxyTargetClass) {
            targetDefinition.setAttribute(AutoProxyUtils.PRESERVE_TARGET_CLASS_ATTRIBUTE, Boolean.TRUE);
        } else {
            proxyDefinition.getPropertyValues().add("proxyTargetClass", Boolean.FALSE);
        }

        proxyDefinition.setAutowireCandidate(targetDefinition.isAutowireCandidate());
        proxyDefinition.setPrimary(targetDefinition.isPrimary());
        if (targetDefinition instanceof AbstractBeanDefinition abd) {
            proxyDefinition.copyQualifiersFrom(abd);
        }

        targetDefinition.setAutowireCandidate(false);
        targetDefinition.setPrimary(false);
        registry.registerBeanDefinition(targetBeanName, targetDefinition);
        return new BeanDefinitionHolder(proxyDefinition, originalBeanName, definition.getAliases());
    }

So, what does ScopedProxyFactoryBean do? Why is it modified to this class? Let's look at the structure of this class:

// It can be seen that it implements FactoryBean (certainly, otherwise how to obtain the bean, it just further wraps the original bean), and also BeanFactoryAware, which is to obtain the BeanFactory to get the instance bean. The AopInfrastructureBean interface indicates that this class can implement AOP logic, marked not to be wrapped by AOP itself. This interface is also implemented by very important InfrastructureAdvisorAutoProxyCreator and AspectJAwareAdvisorAutoProxyCreator, one is Spring's built-in AOP, and the other is to enable AspectJ's AOP. This part is also very interesting, and I will update an article about Spring AOP when I have time.
public class ScopedProxyFactoryBean extends ProxyConfig implements FactoryBean<Object>, BeanFactoryAware, AopInfrastructureBean

Since it implements FactoryBean, let's see what processing is done when obtaining the bean. It's simple; first, a proxy is generated when setting the BeanFactory:

    public void setBeanFactory(BeanFactory beanFactory) {
        if (beanFactory instanceof ConfigurableBeanFactory cbf) {
            this.scopedTargetSource.setBeanFactory(beanFactory);
            ProxyFactory pf = new ProxyFactory();
            pf.copyFrom(this);
            pf.setTargetSource(this.scopedTargetSource);
            Assert.notNull(this.targetBeanName, "Property 'targetBeanName' is required");
            Class beanType = beanFactory.getType(this.targetBeanName);
            if (beanType == null) {
                throw new IllegalStateException("Cannot create scoped proxy for bean '" + this.targetBeanName + "': Target type could not be determined at the time of proxy creation.");
            } else {
                if (!this.isProxyTargetClass() || beanType.isInterface() || Modifier.isPrivate(beanType.getModifiers())) {
                    pf.setInterfaces(ClassUtils.getAllInterfacesForClass(beanType, cbf.getBeanClassLoader()));
                }

                ScopedObject scopedObject = new DefaultScopedObject(cbf, this.scopedTargetSource.getTargetBeanName());
                pf.addAdvice(new DelegatingIntroductionInterceptor(scopedObject));
                pf.addInterface(AopInfrastructureBean.class);
                this.proxy = pf.getProxy(cbf.getBeanClassLoader());
            }
        } else {
            throw new IllegalStateException("Not running in a ConfigurableBeanFactory: " + beanFactory);
        }
    }

Then, when obtaining the bean, it returns the proxy:

    public Object getObject() {
        if (this.proxy == null) {
            throw new FactoryBeanNotInitializedException();
        } else {
            return this.proxy;
        }
    }

What is the use of the generated proxy? Further, let's look at the logic of the proxy class, which actually first enters the AOP callback method each time a method is called to obtain the original object:


		public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
			Object oldProxy = null;
			boolean setProxyContext = false;
			Object target = null;
			TargetSource targetSource = this.advised.getTargetSource();
			try {
				if (this.advised.exposeProxy) {
					// Make invocation available if necessary.
					oldProxy = AopContext.setCurrentProxy(proxy);
					setProxyContext = true;
				}
                // It is here that the bean factory is used to obtain the original object
				target = targetSource.getTarget();
				Class<?> targetClass = (target != null ? target.getClass() : null);
				List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
				Object retVal;

				if (chain.isEmpty()) {

					Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
					retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse);
				}
				else {
					// We need to create a method invocation...
					retVal = new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed();
				}
				return processReturnType(proxy, target, method, args, retVal);
			}
			finally {
				if (target != null && !targetSource.isStatic()) {
					targetSource.releaseTarget(target);
				}
				if (setProxyContext) {
					// Restore old proxy.
					AopContext.setCurrentProxy(oldProxy);
				}
			}
		}

Returning to the initial question, where does dynamic refresh occur? In fact, when the configuration in the configuration center changes, if the bean is destroyed, then the next time it is called, the latest bean can be obtained. Indeed, this is what Spring Cloud does; Spring Cloud uses the refresh scope annotation in conjunction with the RefreshScope class. The refresh scope annotation is wrapped as ScopedProxyFactoryBean, and the RefreshScope class is responsible for handling the lifecycle of beans. This means that the obtained bean is no longer fetched from the original bean factory but from the RefreshScope. The source code is as follows:

if (mbd.isSingleton()) {
    sharedInstance = getSingleton(beanName, () -> {
        try {
            return createBean(beanName, mbd, args);
        }
        catch (BeansException ex) {
            // Explicitly remove instance from singleton cache: It might have been put there
            // eagerly by the creation process, to allow for circular reference resolution.
            // Also remove any beans that received a temporary reference to the bean.
            destroySingleton(beanName);
            throw ex;
        }
    });
    beanInstance = getObjectForBeanInstance(sharedInstance, name, beanName, mbd);
}

else if (mbd.isPrototype()) {
    // It's a prototype -> create a new instance.
    Object prototypeInstance = null;
    try {
        beforePrototypeCreation(beanName);
        prototypeInstance = createBean(beanName, mbd, args);
    }
    finally {
        afterPrototypeCreation(beanName);
    }
    beanInstance = getObjectForBeanInstance(prototypeInstance, name, beanName, mbd);
}

else {
    
    String scopeName = mbd.getScope();
    if (!StringUtils.hasLength(scopeName)) {
        throw new IllegalStateException("No scope name defined for bean '" + beanName + "'");
    }
    Scope scope = this.scopes.get(scopeName);
    if (scope == null) {
        throw new IllegalStateException("No Scope registered for scope name '" + scopeName + "'");
    }
    try {
        // It is here that when it is not singleton or prototype, it goes to the scope to obtain
        Object scopedInstance = scope.get(beanName, () -> {
            beforePrototypeCreation(beanName);
            try {
                return createBean(beanName, mbd, args);
            }
            finally {
                afterPrototypeCreation(beanName);
            }
        });
        beanInstance = getObjectForBeanInstance(scopedInstance, name, beanName, mbd);
    }
    catch (IllegalStateException ex) {
        throw new ScopeNotActiveException(beanName, scopeName, ex);
    }
}

The scope is registered in RefreshScope because it implements BeanFactoryPostProcessor and BeanDefinitionRegistryPostProcessor:

@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
    this.beanFactory = beanFactory;
    // Register RefreshScope so that it can be easily obtained and called above
    beanFactory.registerScope(this.name, this);
    setSerializationId(beanFactory);
}
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
    for (String name : registry.getBeanDefinitionNames()) {
        BeanDefinition definition = registry.getBeanDefinition(name);
        if (definition instanceof RootBeanDefinition root) {
            if (root.getDecoratedDefinition() != null && root.hasBeanClass()
                    && root.getBeanClass() == ScopedProxyFactoryBean.class) {
                if (getName().equals(root.getDecoratedDefinition().getBeanDefinition().getScope())) {
                    // Here, the ScopedProxyFactoryBean scanned into the package is further changed to LockedScopedProxyFactoryBean, of course, in the refresh case, to add a lock. This logic is not important.
                    root.setBeanClass(LockedScopedProxyFactoryBean.class);
                    root.getConstructorArgumentValues().addGenericArgumentValue(this);
                    // Surprising that a scoped proxy bean definition is not already marked as synthetic?
                    root.setSynthetic(true);
                }
            }
        }
    }
}

Okay, finally here, let's take a break and review what we have done above:

  • The beans annotated with @refreshscope are wrapped as LockedScopedProxyFactoryBean, causing each method call to fetch the bean object from RefreshScope.
  • RefreshScope is registered by Spring Cloud, in the auto-configuration class of springcloud-context, registered into the bean factory.
    Now the question arises: why is the object always the latest one when fetched? We naturally think that every time the configuration is refreshed, the beans stored in RefreshScope are destroyed, and then the next time the method is called, the latest bean will be obtained. This is exactly what is done; every time a refresh occurs, RefreshScope is notified to destroy, and the source code is as follows:
public void refreshAll() {
    super.destroy();
    this.context.publishEvent(new RefreshScopeRefreshedEvent());
}
@Override
public void destroy() {
    List<Throwable> errors = new ArrayList<>();
    // Calling refreshAll will clear all caches, so the next time it is fetched, it will be the latest
    Collection<BeanLifecycleWrapper> wrappers = this.cache.clear();
    for (BeanLifecycleWrapper wrapper : wrappers) {
        try {
            Lock lock = this.locks.get(wrapper.getName()).writeLock();
            lock.lock();
            try {
                wrapper.destroy();
            }
            finally {
                lock.unlock();
            }
        }
        catch (RuntimeException e) {
            errors.add(e);
        }
    }
    if (!errors.isEmpty()) {
        throw wrapIfNecessary(errors.get(0));
    }
    this.errors.clear();
}

So when is this called? In ConfigDataContextRefresher, we can see the call:

public synchronized Set<String> refresh() {
    Set<String> keys = refreshEnvironment();
    this.scope.refreshAll();
    return keys;
}
// Spring Cloud registers RefreshScope here
@Bean
@ConditionalOnMissingBean
@ConditionalOnBootstrapDisabled
public ConfigDataContextRefresher configDataContextRefresher(ConfigurableApplicationContext context,
        RefreshScope scope, RefreshProperties properties) {
    return new ConfigDataContextRefresher(context, scope, properties);
}

Who calls ConfigDataContextRefresher? In RefreshEventListener, we can see:

@Override
public void onApplicationEvent(ApplicationEvent event) {
    if (event instanceof ApplicationReadyEvent) {
        handle((ApplicationReadyEvent) event);
    }
    // Upon receiving RefreshEvent, refresh
    else if (event instanceof RefreshEvent) {
        handle((RefreshEvent) event);
    }
}

public void handle(ApplicationReadyEvent event) {
    this.ready.compareAndSet(false, true);
}

public void handle(RefreshEvent event) {
    if (this.ready.get()) { // don't handle events before app is ready
        log.debug("Event received " + event.getEventDesc());
        Set<String> keys = this.refresh.refresh();
        log.info("Refresh keys changed: " + keys);
    }
}
// Spring Cloud injects the above ConfigDataContextRefresher
@Bean
public RefreshEventListener refreshEventListener(ContextRefresher contextRefresher) {
    return new RefreshEventListener(contextRefresher);
}

Finally, who publishes the RefreshEvent? After we introduce the nacos-config dependency, a bean looks relevant, let's take a look:

@Bean
public NacosContextRefresher nacosContextRefresher(
        NacosConfigManager nacosConfigManager,
        NacosRefreshHistory nacosRefreshHistory) {
    // Consider that it is not necessary to be compatible with the previous
    // configuration
    // and use the new configuration if necessary.
    return new NacosContextRefresher(nacosConfigManager, nacosRefreshHistory);
}

private void registerNacosListener(final String groupKey, final String dataKey) {
    String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);
    Listener listener = listenerMap.computeIfAbsent(key,
            lst -> new AbstractSharedListener() {
                @Override
                public void innerReceive(String dataId, String group,
                        String configInfo) {
                    refreshCountIncrement();
                    nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);
                    NacosSnapshotConfigManager.putConfigSnapshot(dataId, group,
                            configInfo);
                    // It can be seen that every time the configuration is updated, a RefreshEvent is published
                    applicationContext.publishEvent(
                            new RefreshEvent(this, null, "Refresh Nacos config"));
                    if (log.isDebugEnabled()) {
                        log.debug(String.format(
                                "Refresh Nacos config group=%s,dataId=%s,configInfo=%s",
                                group, dataId, configInfo));
                    }
                }
            });
    try {
        configService.addListener(dataKey, groupKey, listener);
        log.info("[Nacos Config] Listening config: dataId={}, group={}", dataKey,
                groupKey);
    }
    catch (NacosException e) {
        log.warn(String.format(
                "register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey,
                groupKey), e);
    }
}

Okay, we finally reached the end. Every time the Nacos configuration is updated, a RefreshEvent is published, then the RefreshEventListener receives the event and calls the refresh in ConfigDataContextRefresher, which further calls refresh in RefreshScope, and then clears the cache, so the next fetch will be the latest.

One more thing, let's look at the refresh process:#

public synchronized Set<String> refresh() {
    Set<String> keys = refreshEnvironment();
    this.scope.refreshAll();
    return keys;
}
// Here, when refreshing the environment, an EnvironmentChangeEvent is also published, which is the event source of ConfigurationPropertiesRebinder in Nacos, which will reset all ConfigurationPropertiesBeans
public synchronized Set<String> refreshEnvironment() {
    Map<String, Object> before = extract(this.context.getEnvironment().getPropertySources());
    updateEnvironment();
    Set<String> keys = changes(before, extract(this.context.getEnvironment().getPropertySources())).keySet();
    this.context.publishEvent(new EnvironmentChangeEvent(this.context, keys));
    return keys;
}

// Here
@Bean
@ConditionalOnMissingBean(search = SearchStrategy.CURRENT)
@ConditionalOnNonDefaultBehavior
public ConfigurationPropertiesRebinder smartConfigurationPropertiesRebinder(
        ConfigurationPropertiesBeans beans) {
    // If using default behavior, not use SmartConfigurationPropertiesRebinder.
    // Minimize the possibility of making mistakes.
    return new SmartConfigurationPropertiesRebinder(beans);
}
Internally: it will collect all ConfigurationPropertiesBeans
private boolean rebind(String name, ApplicationContext appContext) {
    try {
        Object bean = appContext.getBean(name);
        if (AopUtils.isAopProxy(bean)) {
            bean = ProxyUtils.getTargetObject(bean);
        }
        if (bean != null) {
            // TODO: determine a more general approach to fix this.
            // see
            // https://github.com/spring-cloud/spring-cloud-commons/issues/571
            if (getNeverRefreshable().contains(bean.getClass().getName())) {
                return false; // ignore
            }
            appContext.getAutowireCapableBeanFactory().destroyBean(bean);
            appContext.getAutowireCapableBeanFactory().initializeBean(bean, name);
            return true;
        }
    }
    catch (RuntimeException e) {
        this.errors.put(name, e);
        throw e;
    }
    catch (Exception e) {
        this.errors.put(name, e);
        throw new IllegalStateException("Cannot rebind to " + name, e);
    }
    return false;
}

Conclusion#

The above is the process of automatic refreshing of refresh scope. There is actually one more point: how does Nacos listen for configuration refresh and publish events? This involves Netty, specifically, Nacos has a scheduled task to check for configuration changes:

@Override
public void startInternal() {
    executor.schedule(() -> {
        while (!executor.isShutdown() && !executor.isTerminated()) {
            try {
                listenExecutebell.poll(5L, TimeUnit.SECONDS);
                if (executor.isShutdown() || executor.isTerminated()) {
                    continue;
                }
                // Here
                executeConfigListen();
            } catch (Throwable e) {
                LOGGER.error("[rpc listen execute] [rpc listen] exception", e);
                try {
                    Thread.sleep(50L);
                } catch (InterruptedException interruptedException) {
                    //ignore
                }
                notifyListenConfig();
            }
        }
    }, 0L, TimeUnit.MILLISECONDS);
    
}

I haven't learned Netty yet; I'll look further next time. Of course, how to pull remote configurations in the bootstrap and how to obtain configurations in EnvironmentPostProcessorApplicationListener also needs to be written, as it relates to configuration pulling. Since Spring Boot 2.4, the bootstrap has been canceled, and EnvironmentPostProcessorApplicationListener has been proposed for easier configuration import.

This article is synchronized and updated to xLog by Mix Space. The original link is https://me.liuyaowen.club/posts/default/20240816and1

Loading...
Ownership of this post data is guaranteed by blockchain and smart contracts to the creator alone.