1. Source code interpretation is divided into three parts: initialization and running process, and extension points#
1.1 Sentinel's own initialization#
What does Sentinel do during Spring Boot startup?
What side effects does it have after introducing dependencies?
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
<version>2023.0.1.0</version>
</dependency>
Let's see what is included in the dependency.
There are many, and we can see several classic parts. Let's first look at the contents of the main body spring-cloud-starter-alibaba-sentinel, generally looking at three parts: first, the auto-configuration class, second, the SPI interface, and third, the extension points in Spring Boot spring.factories.
Let's see what we have.
We can see that only the auto-configuration class is imported. Let's see what configuration classes are imported inside.
com.alibaba.cloud.sentinel.SentinelWebAutoConfiguration
com.alibaba.cloud.sentinel.SentinelWebFluxAutoConfiguration
com.alibaba.cloud.sentinel.endpoint.SentinelEndpointAutoConfiguration
com.alibaba.cloud.sentinel.custom.SentinelAutoConfiguration
com.alibaba.cloud.sentinel.feign.SentinelFeignAutoConfiguration
The first two are adaptations for the Spring Web framework, the third provides external access ports for Sentinel, the fourth initializes and customizes Sentinel, and the fifth supports Feign. Let's look at them one by one, starting with Sentinel's own initialization.
1.2 Sentinel's own initialization, property initialization, data source initialization, aspect initialization, RestTemplate initialization, this part adapts to Spring projects without using MVC.#
@Configuration(proxyBeanMethods = false)
@ConditionalOnProperty(name = "spring.cloud.sentinel.enabled", matchIfMissing = true)
@EnableConfigurationProperties(SentinelProperties.class)
public class SentinelAutoConfiguration {
@Value("${project.name:${spring.application.name:}}")
private String projectName;
@Autowired
private SentinelProperties properties;
// Property initialization
@PostConstruct
public void init() {
if (StringUtils.isEmpty(System.getProperty(LogBase.LOG_DIR))
&& StringUtils.isNotBlank(properties.getLog().getDir())) {
// Log
System.setProperty(LogBase.LOG_DIR, properties.getLog().getDir());
}
if (StringUtils.isEmpty(System.getProperty(LogBase.LOG_NAME_USE_PID))
&& properties.getLog().isSwitchPid()) {
// Log
System.setProperty(LogBase.LOG_NAME_USE_PID,
String.valueOf(properties.getLog().isSwitchPid()));
}
if (StringUtils.isEmpty(System.getProperty(SentinelConfig.APP_NAME_PROP_KEY))
&& StringUtils.isNotBlank(projectName)) {
// Set project name or Spring application name, usually the application name ${spring.application.name:}
System.setProperty(SentinelConfig.APP_NAME_PROP_KEY, projectName);
}
if (StringUtils.isEmpty(System.getProperty(TransportConfig.SERVER_PORT))
&& StringUtils.isNotBlank(properties.getTransport().getPort())) {
// Sentinel's external control port, like the dashboard is accessed through this port, default is public static final String API_PORT = "8719";
System.setProperty(TransportConfig.SERVER_PORT,
properties.getTransport().getPort());
}
if (StringUtils.isEmpty(System.getProperty(TransportConfig.CONSOLE_SERVER))
&& StringUtils.isNotBlank(properties.getTransport().getDashboard())) {
// Dashboard port address
System.setProperty(TransportConfig.CONSOLE_SERVER,
properties.getTransport().getDashboard());
}
if (StringUtils.isEmpty(System.getProperty(TransportConfig.HEARTBEAT_INTERVAL_MS))
&& StringUtils
.isNotBlank(properties.getTransport().getHeartbeatIntervalMs())) {
// Heartbeat interval time, default is private static final long DEFAULT_INTERVAL = 1000 * 10;
System.setProperty(TransportConfig.HEARTBEAT_INTERVAL_MS,
properties.getTransport().getHeartbeatIntervalMs());
}
if (StringUtils.isEmpty(System.getProperty(TransportConfig.HEARTBEAT_CLIENT_IP))
&& StringUtils.isNotBlank(properties.getTransport().getClientIp())) {
// Heartbeat client's IP, default is local IP
System.setProperty(TransportConfig.HEARTBEAT_CLIENT_IP,
properties.getTransport().getClientIp());
}
if (StringUtils.isEmpty(System.getProperty(SentinelConfig.CHARSET))
&& StringUtils.isNotBlank(properties.getMetric().getCharset())) {
System.setProperty(SentinelConfig.CHARSET,
properties.getMetric().getCharset());
}
if (StringUtils
.isEmpty(System.getProperty(SentinelConfig.SINGLE_METRIC_FILE_SIZE))
&& StringUtils.isNotBlank(properties.getMetric().getFileSingleSize())) {
System.setProperty(SentinelConfig.SINGLE_METRIC_FILE_SIZE,
properties.getMetric().getFileSingleSize());
}
if (StringUtils
.isEmpty(System.getProperty(SentinelConfig.TOTAL_METRIC_FILE_COUNT))
&& StringUtils.isNotBlank(properties.getMetric().getFileTotalCount())) {
System.setProperty(SentinelConfig.TOTAL_METRIC_FILE_COUNT,
properties.getMetric().getFileTotalCount());
}
if (StringUtils.isEmpty(System.getProperty(SentinelConfig.COLD_FACTOR))
&& StringUtils.isNotBlank(properties.getFlow().getColdFactor())) {
System.setProperty(SentinelConfig.COLD_FACTOR,
properties.getFlow().getColdFactor());
}
if (StringUtils.isNotBlank(properties.getBlockPage())) {
setConfig(BLOCK_PAGE_URL_CONF_KEY, properties.getBlockPage());
}
// earlier initialize
// Whether to initialize at the beginning, default is false, instead wait until the first call to initialize
if (properties.isEager()) {
InitExecutor.doInit();
}
}
// This is the aspect class initialization that supports the SentinelResource annotation
@Bean
@ConditionalOnMissingBean
public SentinelResourceAspect sentinelResourceAspect() {
return new SentinelResourceAspect();
}
// Initialization of SentinelRestTemplate, initializes a post-processor and adds interceptors to it
@Bean
@ConditionalOnMissingBean
@ConditionalOnClass(name = "org.springframework.web.client.RestTemplate")
@ConditionalOnProperty(name = "resttemplate.sentinel.enabled", havingValue = "true",
matchIfMissing = true)
public static SentinelBeanPostProcessor sentinelBeanPostProcessor(
ApplicationContext applicationContext) {
return new SentinelBeanPostProcessor(applicationContext);
}
// Handling of external property sources, initialization
// After all singleton beans are initialized
/*
public void postRegister(AbstractDataSource dataSource) {
switch (this.getRuleType()) {
case FLOW -> FlowRuleManager.register2Property(dataSource.getProperty());
case DEGRADE -> DegradeRuleManager.register2Property(dataSource.getProperty());
case PARAM_FLOW -> ParamFlowRuleManager.register2Property(dataSource.getProperty());
case SYSTEM -> SystemRuleManager.register2Property(dataSource.getProperty());
case AUTHORITY -> AuthorityRuleManager.register2Property(dataSource.getProperty());
case GW_FLOW -> GatewayRuleManager.register2Property(dataSource.getProperty());
case GW_API_GROUP -> GatewayApiDefinitionManager.register2Property(dataSource.getProperty());
}
}
*/
@Bean
@ConditionalOnMissingBean
public SentinelDataSourceHandler sentinelDataSourceHandler(
DefaultListableBeanFactory beanFactory, SentinelProperties sentinelProperties,
Environment env) {
return new SentinelDataSourceHandler(beanFactory, sentinelProperties, env);
}
// Some converters, for example, setting converters when configuring external property sources
@ConditionalOnClass(ObjectMapper.class)
@Configuration(proxyBeanMethods = false)
protected static class SentinelConverterConfiguration {
// json
@Configuration(proxyBeanMethods = false)
protected static class SentinelJsonConfiguration {
private ObjectMapper objectMapper = new ObjectMapper();
public SentinelJsonConfiguration() {
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
}
@Bean("sentinel-json-flow-converter")
public JsonConverter jsonFlowConverter() {
return new JsonConverter(objectMapper, FlowRule.class);
}
@Bean("sentinel-json-degrade-converter")
public JsonConverter jsonDegradeConverter() {
return new JsonConverter(objectMapper, DegradeRule.class);
}
@Bean("sentinel-json-system-converter")
public JsonConverter jsonSystemConverter() {
return new JsonConverter(objectMapper, SystemRule.class);
}
@Bean("sentinel-json-authority-converter")
public JsonConverter jsonAuthorityConverter() {
return new JsonConverter(objectMapper, AuthorityRule.class);
}
@Bean("sentinel-json-param-flow-converter")
public JsonConverter jsonParamFlowConverter() {
return new JsonConverter(objectMapper, ParamFlowRule.class);
}
}
// xml
@ConditionalOnClass(XmlMapper.class)
@Configuration(proxyBeanMethods = false)
protected static class SentinelXmlConfiguration {
private XmlMapper xmlMapper = new XmlMapper();
public SentinelXmlConfiguration() {
xmlMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
}
@Bean("sentinel-xml-flow-converter")
public XmlConverter xmlFlowConverter() {
return new XmlConverter(xmlMapper, FlowRule.class);
}
@Bean("sentinel-xml-degrade-converter")
public XmlConverter xmlDegradeConverter() {
return new XmlConverter(xmlMapper, DegradeRule.class);
}
@Bean("sentinel-xml-system-converter")
public XmlConverter xmlSystemConverter() {
return new XmlConverter(xmlMapper, SystemRule.class);
}
@Bean("sentinel-xml-authority-converter")
public XmlConverter xmlAuthorityConverter() {
return new XmlConverter(xmlMapper, AuthorityRule.class);
}
@Bean("sentinel-xml-param-flow-converter")
public XmlConverter xmlParamFlowConverter() {
return new XmlConverter(xmlMapper, ParamFlowRule.class);
}
}
}
}
1.3 Adaptation initialization for Spring MVC#
@Configuration(proxyBeanMethods = false)
@ConditionalOnWebApplication(type = Type.SERVLET)
@ConditionalOnProperty(name = "spring.cloud.sentinel.enabled", matchIfMissing = true)
@ConditionalOnClass(SentinelWebInterceptor.class)
@EnableConfigurationProperties(SentinelProperties.class)
public class SentinelWebAutoConfiguration implements WebMvcConfigurer {
private static final Logger log = LoggerFactory
.getLogger(SentinelWebAutoConfiguration.class);
@Autowired
private SentinelProperties properties;
@Autowired
private Optional<UrlCleaner> urlCleanerOptional;
@Autowired
private Optional<BlockExceptionHandler> blockExceptionHandlerOptional;
@Autowired
private Optional<RequestOriginParser> requestOriginParserOptional;
// Here, a global interceptor is initialized
@Bean
@ConditionalOnProperty(name = "spring.cloud.sentinel.filter.enabled",
matchIfMissing = true)
public SentinelWebInterceptor sentinelWebInterceptor(
SentinelWebMvcConfig sentinelWebMvcConfig) {
return new SentinelWebInterceptor(sentinelWebMvcConfig);
}
// Some configurations for the above interceptor
@Bean
@ConditionalOnProperty(name = "spring.cloud.sentinel.filter.enabled",
matchIfMissing = true)
public SentinelWebMvcConfig sentinelWebMvcConfig() {
SentinelWebMvcConfig sentinelWebMvcConfig = new SentinelWebMvcConfig();
// Whether to include the request method in the resource name
sentinelWebMvcConfig.setHttpMethodSpecify(properties.getHttpMethodSpecify());
sentinelWebMvcConfig.setWebContextUnify(properties.getWebContextUnify());
// Exception handling after rate limiting
if (blockExceptionHandlerOptional.isPresent()) {
blockExceptionHandlerOptional
.ifPresent(sentinelWebMvcConfig::setBlockExceptionHandler);
}
else {
if (StringUtils.hasText(properties.getBlockPage())) {
sentinelWebMvcConfig.setBlockExceptionHandler(((request, response,
e) -> response.sendRedirect(properties.getBlockPage())));
}
else {
// Default value for exception handling after rate limiting
sentinelWebMvcConfig
.setBlockExceptionHandler(new DefaultBlockExceptionHandler());
}
}
urlCleanerOptional.ifPresent(sentinelWebMvcConfig::setUrlCleaner);
// Source name resolution
requestOriginParserOptional.ifPresent(sentinelWebMvcConfig::setOriginParser);
return sentinelWebMvcConfig;
}
// Register the interceptor
@Bean
@ConditionalOnProperty(name = "spring.cloud.sentinel.filter.enabled",
matchIfMissing = true)
public SentinelWebMvcConfigurer sentinelWebMvcConfigurer() {
return new SentinelWebMvcConfigurer();
}
}
2 Running Process#
2.1 Spring MVC Interceptor Running Logic#
Interceptors generally have two methods: one for pre-request and one for post-request. Let's first look at the pre-request method.
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
throws Exception {
try {
// First, get the resource name
String resourceName = getResourceName(request);
if (StringUtil.isEmpty(resourceName)) {
return true;
}
// If the requests attribute has $$sentinel_spring_web_entry_attr-rc, count and pass
if (increaseReferece(request, this.baseWebMvcConfig.getRequestRefName(), 1) != 1) {
return true;
}
// Parse the request origin using registered origin parser.
// Generate origin based on HTTP, default is empty, used in StatisticSlot
String origin = parseOrigin(request);
// Get the name of the monitoring container, default is sentinel_spring_web_context
String contextName = getContextName(request);
// Key code, initialize the call context
ContextUtil.enter(contextName, origin);
// Enter the resource, enter the slot plugin module, create
Entry entry = SphU.entry(resourceName, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
request.setAttribute(baseWebMvcConfig.getRequestAttributeName(), entry);
return true;
} catch (BlockException e) {
try {
handleBlockException(request, response, e);
} finally {
ContextUtil.exit();
}
return false;
}
}
// Slot plugin module creation process
public static ProcessorSlotChain newSlotChain() {
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}
// Resolve the slot chain builder SPI.
// Sentinel default ProcessorSlots
// com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
// com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
// com.alibaba.csp.sentinel.slots.logger.LogSlot
// com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
// com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
// com.alibaba.csp.sentinel.slots.system.SystemSlot
// com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot
// com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
// com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
// All default slots, executed in the same order as above
slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
if (slotChainBuilder == null) {
// Should not go through here.
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
slotChainBuilder = new DefaultSlotChainBuilder();
} else {
RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",
slotChainBuilder.getClass().getCanonicalName());
}
return slotChainBuilder.build();
}
2.2 Entering chain.entry(context, resourceWrapper, null, count, prioritized, args) to see the actual running process#
Let's first look at the slot interface for better understanding.
public interface ProcessorSlot<T> {
// The running logic of this slot
void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized,
Object... args) throws Throwable;
// What to do after this slot completes its execution? The abstract class implementation casts the parameters and enters the next slot, forming a unidirectional linked list of their relationships
void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized,
Object... args) throws Throwable;
// Similarly
void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
}
Let's look at the abstract class.
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
// Save the next slot to be executed
private AbstractLinkedProcessorSlot<?> next = null;
// The abstract class implementation casts the parameters and enters the next slot, forming a unidirectional list of their relationships
@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
if (next != null) {
next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
}
}
@SuppressWarnings("unchecked")
void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
throws Throwable {
T t = (T)o;
entry(context, resourceWrapper, t, count, prioritized, args);
}
@Override
public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
if (next != null) {
next.exit(context, resourceWrapper, count, args);
}
}
public AbstractLinkedProcessorSlot<?> getNext() {
return next;
}
public void setNext(AbstractLinkedProcessorSlot<?> next) {
this.next = next;
}
}
Now we are entering the process of calling the slot, let's look at them one by one in the order above.#
// Resolve the slot chain builder SPI.
// Sentinel default ProcessorSlots
// com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
// com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
// com.alibaba.csp.sentinel.slots.logger.LogSlot
// com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
// com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
// com.alibaba.csp.sentinel.slots.system.SystemSlot
// com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot
// com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
// com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
// All default slots, executed in the same order as above
NodeSelectorSlot, resource node selector#
We must clarify one point: the node is bound to the slot chain, each unique resource has a unique slot chain, and the slot chain saves the node (defaultNode), as well as different nodes for different calling contexts (defaultNode). The official website has an explanation.
Each resource has a unique slot chain;
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// Entry size limit.
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
Let's see how NodeSelectorSlot is implemented.
DefaultNode node = map.get(context.getName());
// You can see that it looks up the node using context.getName() as the key, meaning that a resource may have multiple nodes, but it will actually only be counted once, as can be seen in the following ClusterBuilderSlot call
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
// Build invocation tree
// Set the call chain, as it may be nested into different resources,
// context saves the entry node and call chain (doubly linked list), here the current called node is added to the child of the previous node's CTNode, of course, if it is the first entry, it is added to the entry node
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
// Set the current call, set to the current call CTNODE's CurNode, added twice, once to the upper call context, and once to the current
context.setCurNode(node);
ClusterBuilderSlot, resource statistics node construction, also known as ClusterNode#
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args)
throws Throwable {
// First check if the current slot chain has
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
// Create the cluster node.
clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
newMap.put(node.getId(), clusterNode);
clusterNodeMap = newMap;
}
}
}
// You can see that each slot chain will only generate one clusterNode, shared among different context names' nodes
node.setClusterNode(clusterNode);
/*
* if context origin is set, we should get or create a new {@link Node} of
* the specific origin.
*/
// Set the origin, default is none, if there is, maintain a map in the clusterNode
if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
Okay, the logic here is simple: create a clusterNode to collect entry information.
Next, enter LogSlot#
The logic here is also quite simple, record BlockException.
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count, boolean prioritized, Object... args)
throws Throwable {
try {
fireEntry(context, resourceWrapper, obj, count, prioritized, args);
} catch (BlockException e) {
EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),
context.getOrigin(), e.getRule().getId(), count);
throw e;
} catch (Throwable e) {
RecordLog.warn("Unexpected entry exception", e);
}
}
Okay, we have reached the core StatisticSlot#
First, check if you can enter. Here, first execute AuthoritySlot, SystemSlot, ParamFlowSlot, FlowSlot, so let's first look at the logic of the other several slots, skipping this section for now.
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// First check if you can enter, here first execute AuthoritySlot, SystemSlot, ParamFlowSlot, FlowSlot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// Request passed, add thread count and pass count.
node.increaseThreadNum();
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
// Blocked, set block exception to current entry.
context.getCurEntry().setBlockError(e);
// Add block count.
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// Handle block event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
// Unexpected internal error, set error to current entry.
context.getCurEntry().setError(e);
throw e;
}
}
AuthoritySlot#
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
throws Throwable {
// Check authorization information
checkBlackWhiteAuthority(resourceWrapper, context);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
// It just retrieves the implemented authorization rules to see if it can pass
void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();
if (authorityRules == null) {
return;
}
Set<AuthorityRule> rules = authorityRules.get(resource.getName());
if (rules == null) {
return;
}
for (AuthorityRule rule : rules) {
if (!AuthorityRuleChecker.passCheck(rule, context)) {
throw new AuthorityException(context.getOrigin(), rule);
}
}
}
SystemSlot#
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
// Check system health status
// We won't go into this, it basically adjusts the requests that pass based on system metrics, explained on the official website https://sentinelguard.io/zh-cn/docs/system-adaptive-protection.html
SystemRuleManager.checkSystem(resourceWrapper, count);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
ParamFlowSlot#
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
// Check if there are rules
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
fireEntry(context, resourceWrapper, node, count, prioritized, args);
return;
}
// Limit access based on parameter position https://sentinelguard.io/zh-cn/docs/parameter-flow-control.html
checkFlow(resourceWrapper, count, args);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
FlowSlot#
Similar to ParamFlowSlot, it decides whether to pass based on FlowRule flow-control | Sentinel (sentinelguard.io)
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
DegradeSlot#
- Slow call ratio (
SLOW_REQUEST_RATIO
): Selects the slow call ratio as the threshold, requires setting the allowed slow call RT (i.e., maximum response time). If the response time of the request exceeds this value, it is counted as a slow call. When the number of requests during the unit statistical duration (statIntervalMs
) exceeds the minimum request number set, and the ratio of slow calls exceeds the threshold, the requests will be automatically degraded during the subsequent degradation duration. After the degradation duration, the circuit breaker will enter a probe recovery state (HALF-OPEN state). If the response time of the next request is less than the set slow call RT, the degradation ends; if it exceeds the set slow call RT, it will be degraded again. - Error ratio (
ERROR_RATIO
): When the number of requests during the unit statistical duration (statIntervalMs
) exceeds the minimum request number set, and the ratio of errors exceeds the threshold, the requests will be automatically degraded during the subsequent degradation duration. After the degradation duration, the circuit breaker will enter a probe recovery state (HALF-OPEN state). If the next request is successfully completed (no errors), the degradation ends; otherwise, it will be degraded again. The threshold range for the error ratio is[0.0, 1.0]
, representing 0% - 100%. - Error count (
ERROR_COUNT
): When the number of errors during the unit statistical duration exceeds the threshold, it will automatically degrade. After the degradation duration, the circuit breaker will enter a probe recovery state (HALF-OPEN state). If the next request is successfully completed (no errors), the degradation ends; otherwise, it will be degraded again. - circuit-breaking | Sentinel (sentinelguard.io)
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
performChecking(context, resourceWrapper);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
If all the previous checks pass, then return to StatisticSlot.
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// Do some checking.
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// Now executing here
// Increase thread count, there will be two types: one is transient, and the other is cluster
node.increaseThreadNum();
// Increase access count
node.addPassRequest(count);
// Increase source access count
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
// Count the access times and threads of the entry node
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Callback function after passing the rate limit
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
// This indicates that during default processing, waiting a while allows entry, still passing
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Similarly count the access times and threads of the entry node
Constants.ENTRY_NODE.increaseThreadNum();
}
// Callback function after passing the rate limit
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
// Blocked, set block exception to current entry.
context.getCurEntry().setBlockError(e);
// Add block count.
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// Callback function when not passing the rate limit
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
// Unexpected internal error, set error to current entry.
context.getCurEntry().setError(e);
throw e;
}
}
3. Extension Points#
Sentinel has many extension points.
3.1 Initialization process extension InitExecutor#
Execution timing: the first call to entry, or not in Eager mode.
public static void doInit() {
if (!initialized.compareAndSet(false, true)) {
return;
}
try {
// Find all InitFunc
List<InitFunc> initFuncs = SpiLoader.of(InitFunc.class).loadInstanceListSorted();
List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
for (InitFunc initFunc : initFuncs) {
RecordLog.info("[InitExecutor] Found init func: {}", initFunc.getClass().getCanonicalName());
insertSorted(initList, initFunc);
}
for (OrderWrapper w : initList) {
w.func.init();
RecordLog.info("[InitExecutor] Executing {} with order {}",
w.func.getClass().getCanonicalName(), w.order);
}
} catch (Exception ex) {
RecordLog.warn("[InitExecutor] WARN: Initialization failed", ex);
ex.printStackTrace();
} catch (Error error) {
RecordLog.warn("[InitExecutor] ERROR: Initialization failed with fatal error", error);
error.printStackTrace();
}
}
By default, it will find the initialization of clients and servers in cluster mode as well as the initialization of metric callback functions.
public class MetricCallbackInit implements InitFunc {
@Override
public void init() throws Exception {
StatisticSlotCallbackRegistry.addEntryCallback(MetricEntryCallback.class.getCanonicalName(),
new MetricEntryCallback());
StatisticSlotCallbackRegistry.addExitCallback(MetricExitCallback.class.getCanonicalName(),
new MetricExitCallback());
}
}
3.2 Slot/Slot Chain Extension#
Calling timing: when no matching slot chain is found.
public final class SlotChainProvider {
private static volatile SlotChainBuilder slotChainBuilder = null;
/**
* The load and pick process is not thread-safe, but it's okay since the method should be only invoked
* via {@code lookProcessChain} in {@link com.alibaba.csp.sentinel.CtSph} under lock.
*
* @return new created slot chain
*/
public static ProcessorSlotChain newSlotChain() {
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}
// Resolve the slot chain builder SPI.
slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
if (slotChainBuilder == null) {
// Should not go through here.
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
slotChainBuilder = new DefaultSlotChainBuilder();
} else {
RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",
slotChainBuilder.getClass().getCanonicalName());
}
return slotChainBuilder.build();
}
private SlotChainProvider() {}
}
3.3 Transport Extension#
It is actually the interface exposed to the outside by the client, and by default, it will also expose some interfaces for checking the client's situation.
First, there is an API center responsible for receiving external information.
public class SimpleHttpCommandCenter implements CommandCenter {
private static final int PORT_UNINITIALIZED = -1;
private static final int DEFAULT_SERVER_SO_TIMEOUT = 3000;
private static final int DEFAULT_PORT = 8719;
@SuppressWarnings("rawtypes")
private static final Map<String, CommandHandler> handlerMap = new ConcurrentHashMap<String, CommandHandler>();
@SuppressWarnings("PMD.ThreadPoolCreationRule")
private ExecutorService executor = Executors.newSingleThreadExecutor(
new NamedThreadFactory("sentinel-command-center-executor", true));
private ExecutorService bizExecutor;
private ServerSocket socketReference;
@Override
@SuppressWarnings("rawtypes")
public void beforeStart() throws Exception {
// Register handlers
Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
registerCommands(handlers);
}
@Override
public void start() throws Exception {
int nThreads = Runtime.getRuntime().availableProcessors();
this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(10),
new NamedThreadFactory("sentinel-command-center-service-executor", true),
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
CommandCenterLog.info("EventTask rejected");
throw new RejectedExecutionException();
}
});
Runnable serverInitTask = new Runnable() {
int port;
{
try {
port = Integer.parseInt(TransportConfig.getPort());
} catch (Exception e) {
port = DEFAULT_PORT;
}
}
@Override
public void run() {
boolean success = false;
ServerSocket serverSocket = getServerSocketFromBasePort(port);
if (serverSocket != null) {
CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort());
socketReference = serverSocket;
executor.submit(new ServerThread(serverSocket));
success = true;
port = serverSocket.getLocalPort();
} else {
CommandCenterLog.info("[CommandCenter] chooses port fail, http command center will not work");
}
if (!success) {
port = PORT_UNINITIALIZED;
}
TransportConfig.setRuntimePort(port);
executor.shutdown();
}
};
new Thread(serverInitTask).start();
}
/**
* Get a server socket from an available port from a base port.<br>
* Increasing on port number will occur when the port has already been used.
*
* @param basePort base port to start
* @return new socket with available port
*/
private static ServerSocket getServerSocketFromBasePort(int basePort) {
int tryCount = 0;
while (true) {
try {
ServerSocket server = new ServerSocket(basePort + tryCount / 3, 100);
server.setReuseAddress(true);
return server;
} catch (IOException e) {
tryCount++;
try {
TimeUnit.MILLISECONDS.sleep(30);
} catch (InterruptedException e1) {
break;
}
}
}
return null;
}
@Override
public void stop() throws Exception {
if (socketReference != null) {
try {
socketReference.close();
} catch (IOException e) {
CommandCenterLog.warn("Error when releasing the server socket", e);
}
}
if (bizExecutor != null) {
bizExecutor.shutdownNow();
}
executor.shutdownNow();
TransportConfig.setRuntimePort(PORT_UNINITIALIZED);
handlerMap.clear();
}
/**
* Get the name set of all registered commands.
*/
public static Set<String> getCommands() {
return handlerMap.keySet();
}
class ServerThread extends Thread {
private ServerSocket serverSocket;
ServerThread(ServerSocket s) {
this.serverSocket = s;
setName("sentinel-courier-server-accept-thread");
}
@Override
public void run() {
while (true) {
Socket socket = null;
try {
socket = this.serverSocket.accept();
setSocketSoTimeout(socket);
HttpEventTask eventTask = new HttpEventTask(socket);
bizExecutor.submit(eventTask);
} catch (Exception e) {
CommandCenterLog.info("Server error", e);
if (socket != null) {
try {
socket.close();
} catch (Exception e1) {
CommandCenterLog.info("Error when closing an opened socket", e1);
}
}
try {
// In case of infinite log.
Thread.sleep(10);
} catch (InterruptedException e1) {
// Indicates the task should stop.
break;
}
}
}
}
}
}
After receiving it, find the matching handler.
public void run() {
if (socket == null) {
return;
}
PrintWriter printWriter = null;
InputStream inputStream = null;
try {
long start = System.currentTimeMillis();
inputStream = new BufferedInputStream(socket.getInputStream());
OutputStream outputStream = socket.getOutputStream();
printWriter = new PrintWriter(
new OutputStreamWriter(outputStream, Charset.forName(SentinelConfig.charset())));
String firstLine = readLine(inputStream);
CommandCenterLog.info("[SimpleHttpCommandCenter] Socket income: " + firstLine
+ ", addr: " + socket.getInetAddress());
CommandRequest request = processQueryString(firstLine);
if (firstLine.length() > 4 && StringUtil.equalsIgnoreCase("POST", firstLine.substring(0, 4))) {
// Deal with post method
processPostRequest(inputStream, request);
}
// Validate the target command.
String commandName = HttpCommandUtils.getTarget(request);
if (StringUtil.isBlank(commandName)) {
writeResponse(printWriter, StatusCode.BAD_REQUEST, INVALID_COMMAND_MESSAGE);
return;
}
// Find the matching command handler.
CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName);
if (commandHandler != null) {
CommandResponse<?> response = commandHandler.handle(request);
handleResponse(response, printWriter);
} else {
// No matching command handler.
writeResponse(printWriter, StatusCode.BAD_REQUEST, "Unknown command `" + commandName + '`');
}
long cost = System.currentTimeMillis() - start;
CommandCenterLog.info("[SimpleHttpCommandCenter] Deal a socket task: " + firstLine
+ ", address: " + socket.getInetAddress() + ", time cost: " + cost + " ms");
} catch (RequestException e) {
writeResponse(printWriter, e.getStatusCode(), e.getMessage());
} catch (Throwable e) {
CommandCenterLog.warn("[SimpleHttpCommandCenter] CommandCenter error", e);
try {
if (printWriter != null) {
String errorMessage = SERVER_ERROR_MESSAGE;
e.printStackTrace();
if (!writtenHead) {
writeResponse(printWriter, StatusCode.INTERNAL_SERVER_ERROR, errorMessage);
} else {
printWriter.println(errorMessage);
}
printWriter.flush();
}
} catch (Exception e1) {
CommandCenterLog.warn("Failed to write error response", e1);
}
} finally {
closeResource(inputStream);
closeResource(printWriter);
closeResource(socket);
}
}
By default, some handlers will be defined.
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterServerFlowConfigHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterFlowRulesCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterParamFlowRulesCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterServerConfigHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterServerTransportConfigHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyServerNamespaceSetHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterFlowRulesCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.ModifyClusterParamFlowRulesCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterServerInfoCommandHandler
com.alibaba.csp.sentinel.cluster.server.command.handler.FetchClusterMetricCommandHandler
com.alibaba.csp.sentinel.command.handler.GetParamFlowRulesCommandHandler
com.alibaba.csp.sentinel.command.handler.ModifyParamFlowRulesCommandHandler
com.alibaba.csp.sentinel.command.handler.ModifyClusterClientConfigHandler
com.alibaba.csp.sentinel.command.handler.FetchClusterClientConfigHandler
3.4 Cluster Flow Control Extension#
Cluster Flow Control · alibaba/Sentinel Wiki (github.com)
This article was synchronized to xLog by Mix Space. The original link is https://me.liuyaowen.club/posts/default/springcloud-sentinel