Sentinel源码中,如何解析入口类与SlotChain构建流程详细解析?

2026-05-25 21:171阅读0评论SEO基础
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计2672个文字,预计阅读时间需要11分钟。

Sentinel源码中,如何解析入口类与SlotChain构建流程详细解析?

目录

1.测试用例

1.1 流程控制测试

2.注释版源码分析

2.1 默认Context创建 2.2 查找并创建SlotChain 2.1 创建slotChainBuilder 2.2 slotChainBuilder.build() 参考文章 1. 测试用例 我们以sentinel-demo中的senti为例进行测试。

目录
  • 1. 测试用例
    • 1.1 流控测试
  • 2. 注解版源码分析
    • 2.1 默认Context创建
    • 2.2 查找并创建SlotChain
      • 2.2.1 创建slotChainBuilder
      • 2.2.2 slotChainBuilder.build()
  • 参考文章

    1. 测试用例

    我们以sentinel-demo中的sentinel-annotation-spring-aop为例,分析sentinel的源码。核心代码如下:

    DemoController:

    @RestController public class DemoController { @Autowired private TestService service; @GetMapping("/foo") public String apiFoo(@RequestParam(required = false) Long t) throws Exception { if (t == null) { t = System.currentTimeMillis(); } service.test(); return service.hello(t); } @GetMapping("/baz/{name}") public String apiBaz(@PathVariable("name") String name) { return service.helloAnother(name); } }

    TestServiceImpl:

    @Service public class TestServiceImpl implements TestService { @Override @SentinelResource(value = "test", blockHandler = "handleException", blockHandlerClass = {ExceptionUtil.class}) public void test() { System.out.println("Test"); } @Override @SentinelResource(value = "hello", fallback = "helloFallback") public String hello(long s) { if (s < 0) { throw new IllegalArgumentException("invalid arg"); } return String.format("Hello at %d", s); } @Override @SentinelResource(value = "helloAnother", defaultFallback = "defaultFallback", exceptionsToIgnore = {IllegalStateException.class}) public String helloAnother(String name) { if (name == null || "bad".equals(name)) { throw new IllegalArgumentException("oops"); } if ("foo".equals(name)) { throw new IllegalStateException("oops"); } return "Hello, " + name; } public String helloFallback(long s, Throwable ex) { // Do some log here. ex.printStackTrace(); return "Oops, error occurred at " + s; } public String defaultFallback() { System.out.println("Go to default fallback"); return "default_fallback"; } }

    启动类DemoApplication

    @SpringBootApplication public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } }

    在启动这个工程上增加参数:

    -Dcsp.sentinel.dashboard.server=localhost:8081 -Dproject.name=annotation-aspectj

    如图:

    Sentinel源码中,如何解析入口类与SlotChain构建流程详细解析?

    打开localhost:8081/#/dashboard 地址,可以看到应用已经注册到sentinel管理后台:

    1.1 流控测试

    访问 localhost:19966/foo?t=188 这个链接,多访问几次,在实时监控页面可以看到:

    然后,我们先简单配置一个流控规则,如下:

    其中,资源名为:

    然后我们在快速刷新localhost:19966/foo?t=188 接口,会出现限流的情况,返回如下:

    Oops, error occurred at 188

    实时监控为:

    2. 注解版源码分析

    使用注解@SentinelResource 核心原理就是 利用AOP切入到方法中,我们直接看SentinelResourceAspect类,这是一个切面类:

    @Aspect // 切面 public class SentinelResourceAspect extends AbstractSentinelAspectSupport { // 指定切入点为@SentinelResource 注解 @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)") public void sentinelResourceAnnotationPointcut() { } // 环绕通知 @Around("sentinelResourceAnnotationPointcut()") public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable { Method originMethod = resolveMethod(pjp); SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class); if (annotation == null) { // Should not go through here. throw new IllegalStateException("Wrong state for SentinelResource annotation"); } String resourceName = getResourceName(annotation.value(), originMethod); EntryType entryType = annotation.entryType(); int resourceType = annotation.resourceType(); Entry entry = null; try { // 要织入的,增强的功能 entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs()); // 调用目标方法 return pjp.proceed(); } catch (BlockException ex) { return handleBlockException(pjp, annotation, ex); } catch (Throwable ex) { Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore(); // The ignore list will be checked first. if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) { throw ex; } if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) { traceException(ex); return handleFallback(pjp, annotation, ex); } // No fallback function can handle the exception, so throw it out. throw ex; } finally { if (entry != null) { entry.exit(1, pjp.getArgs()); } } } }

    核心方法SphU.entry():

    public static Entry entry(String name, int resourceType, EntryType trafficType, Object[] args) throws BlockException { // 注意 第4个参数值为 1 return Env.sph.entryWithType(name, resourceType, trafficType, 1, args); } @Override public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, Object[] args) throws BlockException { // count 参数:表示当前请求可以增加多少个计数 // 注意 第5个参数为false return entryWithType(name, resourceType, entryType, count, false, args); } @Override public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized, Object[] args) throws BlockException { // 将信息封装为一个资源对象 StringResourceWrapper resource = new StringResourceWrapper(name, entryType, resourceType); // 返回一个资源操作对象entry // prioritized 为true 表示当前访问必须等待"根据其优先级计算出的时间"后才通过 // prioritized 为 false 则当前请求无需等待 return entryWithPriority(resource, count, prioritized, args); }

    我们重点看一下CtSph#entryWithPriority

    /** * @param resourceWrapper * @param count 默认为1 * @param prioritized 默认为false * @param args * @return * @throws BlockException */ private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException { // 从ThreadLocal中获取Context // 一个请求会占用一个线程,一个线程会绑定一个context Context context = ContextUtil.getContext(); // 若context是 NullContext类型,则表示当前系统中的context数量已经超过阈值 // 即访问的请求的数量已经超出了阈值,此时直接返回一个无需做规则检测的资源操作对象 if (context instanceof NullContext) { // The {@link NullContext} indicates that the amount of context has exceeded the threshold, // so here init the entry only. No rule checking will be done. return new CtEntry(resourceWrapper, null, context); } // 当前线程中没有绑定context,则创建一个context并将其放入到Threadlocal if (context == null) { // todo Using default context. context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME); } // Global switch is close, no rule checking will do. // 若全局开关是关闭的,直接返回一个无需做规则检测的资源操作对象 if (!Constants.ON) { return new CtEntry(resourceWrapper, null, context); } // todo 查找SlotChain ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); /* * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, * so no rule checking will be done. */ // 若没有知道chain,则意味着chain数量超出了阈值 if (chain == null) { return new CtEntry(resourceWrapper, null, context); } // 创建一个资源操作对象 Entry e = new CtEntry(resourceWrapper, chain, context); try { // todo 对资源进行操作 chain.entry(context, resourceWrapper, null, count, prioritized, args); } catch (BlockException e1) { e.exit(count, args); throw e1; } catch (Throwable e1) { // This should not happen, unless there are errors existing in Sentinel internal. RecordLog.info("Sentinel unexpected exception", e1); } return e; }

    2.1 默认Context创建

    当前线程没有绑定Context,则创建一个context并将其放入到Threadlocal。核心方法为 InternalContextUtil.internalEnter

    public static Context enter(String name, String origin) { if (Constants.CONTEXT_DEFAULT_NAME.equals(name)) { throw new ContextNameDefineException( "The " + Constants.CONTEXT_DEFAULT_NAME + " can't be permit to defined!"); } return trueEnter(name, origin); } protected static Context trueEnter(String name, String origin) { // 尝试从ThreadLocal中获取context Context context = contextHolder.get(); // 若Threadlocal中没有,则尝试从缓存map中获取 if (context == null) { // 缓存map的key为context名称,value为EntranceNode Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap; // DCL 双重检测锁,防止并发创建对象 DefaultNode node = localCacheNameMap.get(name); if (node == null) { // 若缓存map的size 大于 context数量的最大阈值,则直接返回NULL_CONTEXT if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) { setNullContext(); return NULL_CONTEXT; } else { LOCK.lock(); try { node = contextNameNodeMap.get(name); if (node == null) { if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) { setNullContext(); return NULL_CONTEXT; } else { // 创建一个EntranceNode node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null); // Add entrance node. // 将新建的node添加到Root Constants.ROOT.addChild(node); // 将新建的node写入到缓存map // 为了防止"迭代稳定性问题"-iterate stable 对于共享集合的写操作 Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1); newMap.putAll(contextNameNodeMap); newMap.put(name, node); contextNameNodeMap = newMap; } } } finally { LOCK.unlock(); } } } // 将context的name与entranceNode 封装成context context = new Context(node, name); // 初始化context的来源 context.setOrigin(origin); // 将context写入到ThreadLocal contextHolder.set(context); } return context; }

    注意:因为 private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>();是 HashMap结构,所以存在并发安全问题,采用 代码中方式进行添加操作。

    2.2 查找并创建SlotChain

    构建调用链lookProcessChain(resourceWrapper)

    ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) { // 缓存map的key为资源 value为其相关的SlotChain ProcessorSlotChain chain = chainMap.get(resourceWrapper); // DCL // 若缓存中没有相关的SlotChain 则创建一个并放入到缓存中 if (chain == null) { synchronized (LOCK) { chain = chainMap.get(resourceWrapper); if (chain == null) { // Entry size limit. // 缓存map的size 大于 chain数量的最大阈值,则直接返回null,不在创建新的chain if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) { return null; } // todo 创建新的chain chain = SlotChainProvider.newSlotChain(); // 防止 迭代稳定性问题 Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>( chainMap.size() + 1); newMap.putAll(chainMap); newMap.put(resourceWrapper, chain); chainMap = newMap; } } } return chain; }

    我们直接看核心方法SlotChainProvider.newSlotChain();

    public static ProcessorSlotChain newSlotChain() { // 若builder不为null,则直接使用builder构建一个chain // 否则先创建一个builder if (slotChainBuilder != null) { return slotChainBuilder.build(); } // Resolve the slot chain builder SPI. // 通过SPI方式创建builder slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault(); // 若通过SPI未能创建builder,则创建一个默认的DefaultSlotChainBuilder if (slotChainBuilder == null) { // Should not go through here. RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default"); slotChainBuilder = new DefaultSlotChainBuilder(); } else { RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}", slotChainBuilder.getClass().getCanonicalName()); } // todo 构建一个chain return slotChainBuilder.build(); } private SlotChainProvider() {} }

    2.2.1 创建slotChainBuilder

    // 通过SPI方式创建builder slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();

    通过SPI方法创建slotChainBuilder,去项目中META-INF.service中获取:

    2.2.2 slotChainBuilder.build()

    @Spi(isDefault = true) public class DefaultSlotChainBuilder implements SlotChainBuilder { @Override public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); // 通过SPI方式构建Slot List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted(); for (ProcessorSlot slot : sortedSlotList) { if (!(slot instanceof AbstractLinkedProcessorSlot)) { RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain"); continue; } chain.addLast((AbstractLinkedProcessorSlot<?>) slot); } return chain; } }

    通过SPI机制,去项目中META-INF.service中获取,在sentinel-core项目中:

    还有一个ParamFlowSlot,在sentinel-extension/sentinel-parameter-flow-control下:

    我们点击 NodeSelectorSlot, 类上面是有 优先级order,数字越小,优先级越高。

    @Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT) public class NodeSelectorSlot extends AbstractLinkedProcessorSlot&lt;Object&gt; {

    优先级常量为:

    public static final int ORDER_NODE_SELECTOR_SLOT = -10000; public static final int ORDER_CLUSTER_BUILDER_SLOT = -9000; public static final int ORDER_LOG_SLOT = -8000; public static final int ORDER_STATISTIC_SLOT = -7000; public static final int ORDER_AUTHORITY_SLOT = -6000; public static final int ORDER_SYSTEM_SLOT = -5000; public static final int ORDER_FLOW_SLOT = -2000; public static final int ORDER_DEGRADE_SLOT = -1000;

    我们看代码中的变量sortedSlotList,已经按照优先级排序好了:

    我们看一下构建的ProcessorSlotChain,类似一个单链表结构,如下:

    我们看一下相关的类结构:DefaultProcessorSlotChain:

    // 这是一个单向链表,默认包含一个接节点,且有两个指针first 和end同时指向这个节点 public class DefaultProcessorSlotChain extends ProcessorSlotChain { AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() { @Override public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable { super.fireEntry(context, resourceWrapper, t, count, prioritized, args); } @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { super.fireExit(context, resourceWrapper, count, args); } }; AbstractLinkedProcessorSlot<?> end = first; @Override public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) { protocolProcessor.setNext(first.getNext()); first.setNext(protocolProcessor); if (end == first) { end = protocolProcessor; } } @Override public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) { end.setNext(protocolProcessor); end = protocolProcessor; } }

    AbstractLinkedProcessorSlot:

    public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> { // 声明一个同类型的变量,则可以指向下一个Slot节点 private AbstractLinkedProcessorSlot<?> next = null; @Override public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { if (next != null) { next.transformEntry(context, resourceWrapper, obj, count, prioritized, args); } } @SuppressWarnings("unchecked") void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args) throws Throwable { T t = (T)o; entry(context, resourceWrapper, t, count, prioritized, args); } @Override public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { if (next != null) { next.exit(context, resourceWrapper, count, args); } } public AbstractLinkedProcessorSlot<?> getNext() { return next; } public void setNext(AbstractLinkedProcessorSlot<?> next) { this.next = next; } }

    构建完成后的SlotChain和工作原理图一样:

    接下来,对资源进行操作的核心方法为chain.entry(context, resourceWrapper, null, count, prioritized, args);,这个我们下篇文章分析。

    参考文章

    Sentinel1.8.5源码github地址(注释)

    Sentinel官网

    以上就是Sentinel源码解析入口类和SlotChain构建过程详解的详细内容,更多关于Sentinel入口类SlotChain构建的资料请关注自由互联其它相关文章!

    本文共计2672个文字,预计阅读时间需要11分钟。

    Sentinel源码中,如何解析入口类与SlotChain构建流程详细解析?

    目录

    1.测试用例

    1.1 流程控制测试

    2.注释版源码分析

    2.1 默认Context创建 2.2 查找并创建SlotChain 2.1 创建slotChainBuilder 2.2 slotChainBuilder.build() 参考文章 1. 测试用例 我们以sentinel-demo中的senti为例进行测试。

    目录
    • 1. 测试用例
      • 1.1 流控测试
    • 2. 注解版源码分析
      • 2.1 默认Context创建
      • 2.2 查找并创建SlotChain
        • 2.2.1 创建slotChainBuilder
        • 2.2.2 slotChainBuilder.build()
    • 参考文章

      1. 测试用例

      我们以sentinel-demo中的sentinel-annotation-spring-aop为例,分析sentinel的源码。核心代码如下:

      DemoController:

      @RestController public class DemoController { @Autowired private TestService service; @GetMapping("/foo") public String apiFoo(@RequestParam(required = false) Long t) throws Exception { if (t == null) { t = System.currentTimeMillis(); } service.test(); return service.hello(t); } @GetMapping("/baz/{name}") public String apiBaz(@PathVariable("name") String name) { return service.helloAnother(name); } }

      TestServiceImpl:

      @Service public class TestServiceImpl implements TestService { @Override @SentinelResource(value = "test", blockHandler = "handleException", blockHandlerClass = {ExceptionUtil.class}) public void test() { System.out.println("Test"); } @Override @SentinelResource(value = "hello", fallback = "helloFallback") public String hello(long s) { if (s < 0) { throw new IllegalArgumentException("invalid arg"); } return String.format("Hello at %d", s); } @Override @SentinelResource(value = "helloAnother", defaultFallback = "defaultFallback", exceptionsToIgnore = {IllegalStateException.class}) public String helloAnother(String name) { if (name == null || "bad".equals(name)) { throw new IllegalArgumentException("oops"); } if ("foo".equals(name)) { throw new IllegalStateException("oops"); } return "Hello, " + name; } public String helloFallback(long s, Throwable ex) { // Do some log here. ex.printStackTrace(); return "Oops, error occurred at " + s; } public String defaultFallback() { System.out.println("Go to default fallback"); return "default_fallback"; } }

      启动类DemoApplication

      @SpringBootApplication public class DemoApplication { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } }

      在启动这个工程上增加参数:

      -Dcsp.sentinel.dashboard.server=localhost:8081 -Dproject.name=annotation-aspectj

      如图:

      Sentinel源码中,如何解析入口类与SlotChain构建流程详细解析?

      打开localhost:8081/#/dashboard 地址,可以看到应用已经注册到sentinel管理后台:

      1.1 流控测试

      访问 localhost:19966/foo?t=188 这个链接,多访问几次,在实时监控页面可以看到:

      然后,我们先简单配置一个流控规则,如下:

      其中,资源名为:

      然后我们在快速刷新localhost:19966/foo?t=188 接口,会出现限流的情况,返回如下:

      Oops, error occurred at 188

      实时监控为:

      2. 注解版源码分析

      使用注解@SentinelResource 核心原理就是 利用AOP切入到方法中,我们直接看SentinelResourceAspect类,这是一个切面类:

      @Aspect // 切面 public class SentinelResourceAspect extends AbstractSentinelAspectSupport { // 指定切入点为@SentinelResource 注解 @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)") public void sentinelResourceAnnotationPointcut() { } // 环绕通知 @Around("sentinelResourceAnnotationPointcut()") public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable { Method originMethod = resolveMethod(pjp); SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class); if (annotation == null) { // Should not go through here. throw new IllegalStateException("Wrong state for SentinelResource annotation"); } String resourceName = getResourceName(annotation.value(), originMethod); EntryType entryType = annotation.entryType(); int resourceType = annotation.resourceType(); Entry entry = null; try { // 要织入的,增强的功能 entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs()); // 调用目标方法 return pjp.proceed(); } catch (BlockException ex) { return handleBlockException(pjp, annotation, ex); } catch (Throwable ex) { Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore(); // The ignore list will be checked first. if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) { throw ex; } if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) { traceException(ex); return handleFallback(pjp, annotation, ex); } // No fallback function can handle the exception, so throw it out. throw ex; } finally { if (entry != null) { entry.exit(1, pjp.getArgs()); } } } }

      核心方法SphU.entry():

      public static Entry entry(String name, int resourceType, EntryType trafficType, Object[] args) throws BlockException { // 注意 第4个参数值为 1 return Env.sph.entryWithType(name, resourceType, trafficType, 1, args); } @Override public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, Object[] args) throws BlockException { // count 参数:表示当前请求可以增加多少个计数 // 注意 第5个参数为false return entryWithType(name, resourceType, entryType, count, false, args); } @Override public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized, Object[] args) throws BlockException { // 将信息封装为一个资源对象 StringResourceWrapper resource = new StringResourceWrapper(name, entryType, resourceType); // 返回一个资源操作对象entry // prioritized 为true 表示当前访问必须等待"根据其优先级计算出的时间"后才通过 // prioritized 为 false 则当前请求无需等待 return entryWithPriority(resource, count, prioritized, args); }

      我们重点看一下CtSph#entryWithPriority

      /** * @param resourceWrapper * @param count 默认为1 * @param prioritized 默认为false * @param args * @return * @throws BlockException */ private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException { // 从ThreadLocal中获取Context // 一个请求会占用一个线程,一个线程会绑定一个context Context context = ContextUtil.getContext(); // 若context是 NullContext类型,则表示当前系统中的context数量已经超过阈值 // 即访问的请求的数量已经超出了阈值,此时直接返回一个无需做规则检测的资源操作对象 if (context instanceof NullContext) { // The {@link NullContext} indicates that the amount of context has exceeded the threshold, // so here init the entry only. No rule checking will be done. return new CtEntry(resourceWrapper, null, context); } // 当前线程中没有绑定context,则创建一个context并将其放入到Threadlocal if (context == null) { // todo Using default context. context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME); } // Global switch is close, no rule checking will do. // 若全局开关是关闭的,直接返回一个无需做规则检测的资源操作对象 if (!Constants.ON) { return new CtEntry(resourceWrapper, null, context); } // todo 查找SlotChain ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); /* * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, * so no rule checking will be done. */ // 若没有知道chain,则意味着chain数量超出了阈值 if (chain == null) { return new CtEntry(resourceWrapper, null, context); } // 创建一个资源操作对象 Entry e = new CtEntry(resourceWrapper, chain, context); try { // todo 对资源进行操作 chain.entry(context, resourceWrapper, null, count, prioritized, args); } catch (BlockException e1) { e.exit(count, args); throw e1; } catch (Throwable e1) { // This should not happen, unless there are errors existing in Sentinel internal. RecordLog.info("Sentinel unexpected exception", e1); } return e; }

      2.1 默认Context创建

      当前线程没有绑定Context,则创建一个context并将其放入到Threadlocal。核心方法为 InternalContextUtil.internalEnter

      public static Context enter(String name, String origin) { if (Constants.CONTEXT_DEFAULT_NAME.equals(name)) { throw new ContextNameDefineException( "The " + Constants.CONTEXT_DEFAULT_NAME + " can't be permit to defined!"); } return trueEnter(name, origin); } protected static Context trueEnter(String name, String origin) { // 尝试从ThreadLocal中获取context Context context = contextHolder.get(); // 若Threadlocal中没有,则尝试从缓存map中获取 if (context == null) { // 缓存map的key为context名称,value为EntranceNode Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap; // DCL 双重检测锁,防止并发创建对象 DefaultNode node = localCacheNameMap.get(name); if (node == null) { // 若缓存map的size 大于 context数量的最大阈值,则直接返回NULL_CONTEXT if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) { setNullContext(); return NULL_CONTEXT; } else { LOCK.lock(); try { node = contextNameNodeMap.get(name); if (node == null) { if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) { setNullContext(); return NULL_CONTEXT; } else { // 创建一个EntranceNode node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null); // Add entrance node. // 将新建的node添加到Root Constants.ROOT.addChild(node); // 将新建的node写入到缓存map // 为了防止"迭代稳定性问题"-iterate stable 对于共享集合的写操作 Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1); newMap.putAll(contextNameNodeMap); newMap.put(name, node); contextNameNodeMap = newMap; } } } finally { LOCK.unlock(); } } } // 将context的name与entranceNode 封装成context context = new Context(node, name); // 初始化context的来源 context.setOrigin(origin); // 将context写入到ThreadLocal contextHolder.set(context); } return context; }

      注意:因为 private static volatile Map<String, DefaultNode> contextNameNodeMap = new HashMap<>();是 HashMap结构,所以存在并发安全问题,采用 代码中方式进行添加操作。

      2.2 查找并创建SlotChain

      构建调用链lookProcessChain(resourceWrapper)

      ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) { // 缓存map的key为资源 value为其相关的SlotChain ProcessorSlotChain chain = chainMap.get(resourceWrapper); // DCL // 若缓存中没有相关的SlotChain 则创建一个并放入到缓存中 if (chain == null) { synchronized (LOCK) { chain = chainMap.get(resourceWrapper); if (chain == null) { // Entry size limit. // 缓存map的size 大于 chain数量的最大阈值,则直接返回null,不在创建新的chain if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) { return null; } // todo 创建新的chain chain = SlotChainProvider.newSlotChain(); // 防止 迭代稳定性问题 Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>( chainMap.size() + 1); newMap.putAll(chainMap); newMap.put(resourceWrapper, chain); chainMap = newMap; } } } return chain; }

      我们直接看核心方法SlotChainProvider.newSlotChain();

      public static ProcessorSlotChain newSlotChain() { // 若builder不为null,则直接使用builder构建一个chain // 否则先创建一个builder if (slotChainBuilder != null) { return slotChainBuilder.build(); } // Resolve the slot chain builder SPI. // 通过SPI方式创建builder slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault(); // 若通过SPI未能创建builder,则创建一个默认的DefaultSlotChainBuilder if (slotChainBuilder == null) { // Should not go through here. RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default"); slotChainBuilder = new DefaultSlotChainBuilder(); } else { RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}", slotChainBuilder.getClass().getCanonicalName()); } // todo 构建一个chain return slotChainBuilder.build(); } private SlotChainProvider() {} }

      2.2.1 创建slotChainBuilder

      // 通过SPI方式创建builder slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();

      通过SPI方法创建slotChainBuilder,去项目中META-INF.service中获取:

      2.2.2 slotChainBuilder.build()

      @Spi(isDefault = true) public class DefaultSlotChainBuilder implements SlotChainBuilder { @Override public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); // 通过SPI方式构建Slot List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted(); for (ProcessorSlot slot : sortedSlotList) { if (!(slot instanceof AbstractLinkedProcessorSlot)) { RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain"); continue; } chain.addLast((AbstractLinkedProcessorSlot<?>) slot); } return chain; } }

      通过SPI机制,去项目中META-INF.service中获取,在sentinel-core项目中:

      还有一个ParamFlowSlot,在sentinel-extension/sentinel-parameter-flow-control下:

      我们点击 NodeSelectorSlot, 类上面是有 优先级order,数字越小,优先级越高。

      @Spi(isSingleton = false, order = Constants.ORDER_NODE_SELECTOR_SLOT) public class NodeSelectorSlot extends AbstractLinkedProcessorSlot&lt;Object&gt; {

      优先级常量为:

      public static final int ORDER_NODE_SELECTOR_SLOT = -10000; public static final int ORDER_CLUSTER_BUILDER_SLOT = -9000; public static final int ORDER_LOG_SLOT = -8000; public static final int ORDER_STATISTIC_SLOT = -7000; public static final int ORDER_AUTHORITY_SLOT = -6000; public static final int ORDER_SYSTEM_SLOT = -5000; public static final int ORDER_FLOW_SLOT = -2000; public static final int ORDER_DEGRADE_SLOT = -1000;

      我们看代码中的变量sortedSlotList,已经按照优先级排序好了:

      我们看一下构建的ProcessorSlotChain,类似一个单链表结构,如下:

      我们看一下相关的类结构:DefaultProcessorSlotChain:

      // 这是一个单向链表,默认包含一个接节点,且有两个指针first 和end同时指向这个节点 public class DefaultProcessorSlotChain extends ProcessorSlotChain { AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() { @Override public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable { super.fireEntry(context, resourceWrapper, t, count, prioritized, args); } @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { super.fireExit(context, resourceWrapper, count, args); } }; AbstractLinkedProcessorSlot<?> end = first; @Override public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) { protocolProcessor.setNext(first.getNext()); first.setNext(protocolProcessor); if (end == first) { end = protocolProcessor; } } @Override public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) { end.setNext(protocolProcessor); end = protocolProcessor; } }

      AbstractLinkedProcessorSlot:

      public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> { // 声明一个同类型的变量,则可以指向下一个Slot节点 private AbstractLinkedProcessorSlot<?> next = null; @Override public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { if (next != null) { next.transformEntry(context, resourceWrapper, obj, count, prioritized, args); } } @SuppressWarnings("unchecked") void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args) throws Throwable { T t = (T)o; entry(context, resourceWrapper, t, count, prioritized, args); } @Override public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { if (next != null) { next.exit(context, resourceWrapper, count, args); } } public AbstractLinkedProcessorSlot<?> getNext() { return next; } public void setNext(AbstractLinkedProcessorSlot<?> next) { this.next = next; } }

      构建完成后的SlotChain和工作原理图一样:

      接下来,对资源进行操作的核心方法为chain.entry(context, resourceWrapper, null, count, prioritized, args);,这个我们下篇文章分析。

      参考文章

      Sentinel1.8.5源码github地址(注释)

      Sentinel官网

      以上就是Sentinel源码解析入口类和SlotChain构建过程详解的详细内容,更多关于Sentinel入口类SlotChain构建的资料请关注自由互联其它相关文章!