Resilience4j是一个轻量级、易于使用的容错库,其灵感来自Netflix Hystrix,但专为Java 8和函数式编程设计。轻量级,由于库只使用Vavr,它没有任何其他外部库依赖项。相比之下,Netflix Hystrix对ArchaIUs有一个编译依赖关系,Archaius有更多的外部库依赖关系,如Guava和Apache Commons。

Resilience4j提供高阶函数(decorators)来增强任何功效接口、lambda表达式或方式引用,包罗断路器、速率限制器、重试或舱壁。可以在任何函数接口、lambda表达式或方式引用上使用多个装饰器。优点是您可以选择所需的装饰器,而无需其他任何东西。

有了Resilience4j,你不必全力以赴,你可以选择你需要的。

https://resilience4j.readme.io/docs/getting-started

概览

Resilience4j提供了两种舱壁模式(Bulkhead),可用于限制并发执行的次数:

  • SemaphoreBulkhead(信号量舱壁,默认),基于Java并发库中的Semaphore实现。
  • FixedThreadPoolBulkhead(牢固线程池舱壁),它使用一个有界行列和一个牢固线程池。

本文将演示在Spring Boot2中集成Resilience4j库,以及在多并发情形下实现如上两种舱壁模式。

引入依赖

在Spring Boot2项目中引入Resilience4j相关依赖

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-spring-boot2</artifactId>
    <version>1.4.0</version>
</dependency>
<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-bulkhead</artifactId>
    <version>1.4.0</version>
</dependency>

由于Resilience4j的Bulkhead依赖于Spring AOP,以是我们需要引入Spring Boot AOP相关依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-aop</artifactId>
</dependency>

我们可能还希望领会Resilience4j在程序中的运行时状态,以是需要通过Spring Boot Actuator将其露出出来

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

实现SemaphoreBulkhead(信号量舱壁)

resilience4j-spring-boot2实现了对resilience4j的自动设置,因此我们仅需在项目中的yml/properties文件中编写设置即可。

SemaphoreBulkhead的设置项如下:

属性设置 默认值 寄义
maxConcurrentCalls 25 舱壁允许的最大并行执行量
maxWaitDuration 0 实验进入饱和舱壁时,应壅闭线程的最长时间

添加设置

示例(使用yml):

resilience4j.bulkhead:
  configs:
    default:
      maxConcurrentCalls: 5
      maxWaitDuration: 20ms
  instances:
    backendA:
      baseConfig: default
    backendB:
      maxWaitDuration: 10ms
      maxConcurrentCalls: 20

如上,我们设置了SemaphoreBulkhead的默认设置为maxConcurrentCalls: 5,maxWaitDuration: 20ms。并在backendA实例上应用了默认设置,而在backendB实例上使用自界说的设置。这里的实例可以理解为一个方式/lambda表达式等等的可执行单元。

编写Bulkhead逻辑

界说一个受SemaphoreBulkhead治理的Service类:

@Service
public class BulkheadService {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private BulkheadRegistry bulkheadRegistry;

    @Bulkhead(name = "backendA")
    public JsonNode getJsonObject() throws InterruptedException {
        io.github.resilience4j.bulkhead.Bulkhead.Metrics metrics = bulkheadRegistry.bulkhead("backendA").getMetrics();
        logger.info("now i enter the method!!!,{}<<<<<<{}", metrics.getAvailableConcurrentCalls(), metrics.getMaxAlloweDConcurrentCalls());
        Thread.sleep(1000L);
        logger.info("now i exist the method!!!");
        return new ObjectMapper().createObjectNode().put("file", System.currentTimeMillis());
    }
}

如上,我们将@Bulkhead注解放到需要治理的方式上面。而且通过name属性指定该方式对应的Bulkhead实例名字(这里我们指定的实例名字为backendA,以是该方式将会行使默认的设置)。

界说接口类:

@RestController
public class BulkheadResource {
    @Autowired
    private BulkheadService bulkheadService;

    @GetMapping("/json-object")
    public ResponseEntity<JsonNode> getJsonObject() throws InterruptedException {
        return ResponseEntity.ok(bulkheadService.getJsonObject());
    }
}

编写测试:

首先添加测试相关依赖

<dependency>
    <groupId>io.rest-assured</groupId>
    <artifactId>rest-assured</artifactId>
    <version>3.0.5</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <version>4.0.2</version>
    <scope>test</scope>
</dependency>

这里我们使用rest-assured和awaitility编写多并发情形下的API测试

public class SemaphoreBulkheadTests extends Resilience4jDemoApplicationTests {
    @LocalServerPort
    private int port;
    @BeforeEach
    public void init() {
        RestAssured.baseURI = "http://localhost";
        RestAssured.port = port;
    }

    @Test
    public void 多并发接见情形下的SemaphoreBulkhead测试() {
        CopyOnWriteArrayList<Integer> statusList = new CopyOnWriteArrayList<>();
        IntStream.range(0, 8).forEach(i -> CompletableFuture.runAsync(() -> {
                statusList.add(given().get("/json-object").statusCode());
            }
        ));
        await().atMost(1, TimeUnit.MINUTES).until(() -> statusList.sIZe() == 8);
        System.out.println(statusList);
        assertThat(statusList.stream().filter(i -> i == 200).count()).isEqualTo(5);
        assertThat(statusList.stream().filter(i -> i == 500).count()).isEqualTo(3);
    }
}

可以看到所有请求中只有前五个顺遂通过了,其余三个都由于超时而导致接口报500异常。我们可能并不希望这种不友好的提醒,因此Resilience4j提供了自界说的失败回退方式。当请求并发量过大时,无法正常执行的请求将进入回退方式。

首先我们界说一个回退方式

private JsonNode fallback(BulkheadFullException exception) {
        return new ObjectMapper().createObjectNode().put("errorFile", System.currentTimeMillis());
    }

注重:回退方式应该和挪用方式放置在统一类中,而且必须具有相同的方式署名,而且仅带有一个分外的目的异常参数。

然后在@Bulkhead注解中指定回退方式:@Bulkhead(name = "backendA", fallbackMethod = "fallback")

最后修改API测试代码:

@Test
public void 多并发接见情形下的SemaphoreBulkhead测试使用回退方式() {
    CopyOnWriteArrayList<Integer> statusList = new CopyOnWriteArrayList<>();
    IntStream.range(0, 8).forEach(i -> CompletableFuture.runAsync(() -> {
            statusList.add(given().get("/json-object").statusCode());
        }
    ));
    await().atMost(1, TimeUnit.MINUTES).until(() -> statusList.size() == 8);
    System.out.println(statusList);
    assertThat(statusList.stream().filter(i -> i == 200).count()).isEqualTo(8);
}

运行单元测试,乐成!可以看到,我们界说的回退方式,在请求过量时起作用了。

实现FixedThreadPoolBulkhead(牢固线程池舱壁)

FixedThreadPoolBulkhead的设置项如下:

设置名称 默认值 寄义
maxThreadPoolSize Runtime.getRuntime().availableProcessors() 设置最大线程池巨细
coreThreadPoolSize Runtime.getRuntime().availableProcessors() - 1 设置焦点线程池巨细
queueCapacity 100 设置行列的容量
keepAliveDuration 20ms 当线程数大于焦点时,这是多余空闲线程在终止前守候新任务的最长时间

添加设置

示例(使用yml):

resilience4j.thread-pool-bulkhead:
  configs:
    default:
      maxThreadPoolSize: 4
      coreThreadPoolSize: 2
      queueCapacity: 2
  instances:
    backendA:
      baseConfig: default
    backendB:
      maxThreadPoolSize: 1
      coreThreadPoolSize: 1
      queueCapacity: 1

如上,我们界说了一段简朴的FixedThreadPoolBulkhead设置,我们指定的默认设置为:maxThreadPoolSize: 4,coreThreadPoolSize: 2,queueCapacity: 2,而且指定了两个实例,其中backendA使用了默认设置而backendB使用了自界说的设置。

编写Bulkhead逻辑

界说一个受FixedThreadPoolBulkhead治理的方式:

@Bulkhead(name = "backendA", type = Bulkhead.Type.THREADPOOL)
public CompletableFuture<JsonNode> getJsonObjectByThreadPool() throws InterruptedException {
    io.github.resilience4j.bulkhead.ThreadPoolBulkhead.Metrics metrics = threadPoolBulkheadRegistry.bulkhead("backendA").getMetrics();
    logger.info("now i enter the method!!!,{}", metrics);
    Thread.sleep(1000L);
    logger.info("now i exist the method!!!");
    return CompletableFuture.supplyAsync(() -> new ObjectMapper().createObjectNode().put("file", System.currentTimeMillis()));
}

如上界说和SemaphoreBulkhead的方式大同小异,其中@Bulkhead显示指定了type的属性为Bulkhead.Type.THREADPOOL,解释其方式受FixedThreadPoolBulkhead治理。由于@Bulkhead默认的BulkheadSemaphoreBulkhead,以是在未指定type的情形下为SemaphoreBulkhead。另外,FixedThreadPoolBulkhead只对CompletableFuture方式有用,以是我们必建立返回CompletableFuture类型的方式。

界说接口类方式

@GetMapping("/json-object-with-threadpool")
public ResponseEntity<JsonNode> getJsonObjectWithThreadPool() throws InterruptedException, ExecutionException {
    return ResponseEntity.ok(bulkheadService.getJsonObjectByThreadPool().get());
}

编写测试代码

@Test
public void 多并发接见情形下的ThreadPoolBulkhead测试() {
    CopyOnWriteArrayList<Integer> statusList = new CopyOnWriteArrayList<>();
    IntStream.range(0, 8).forEach(i -> CompletableFuture.runAsync(() -> {
            statusList.add(given().get("/json-object-with-threadpool").statusCode());
        }
    ));
    await().atMost(1, TimeUnit.MINUTES).until(() -> statusList.size() == 8);
    System.out.println(statusList);
    assertThat(statusList.stream().filter(i -> i == 200).count()).isEqualTo(6);
    assertThat(statusList.stream().filter(i -> i == 500).count()).isEqualTo(2);
}

测试中我们并行请求了8次,其中6次请求乐成,2次失败。凭据FixedThreadPoolBulkhead的默认设置,最多能容纳maxThreadPoolSize+queueCapacity次请求(凭据我们上面的设置为6次)。

同样,我们可能并不希望这种不友好的提醒,那么我们可以指定回退方式,在请求无法正常执行时使用回退方式。

private CompletableFuture<JsonNode> fallbackByThreadPool(BulkheadFullException exception) {
    return CompletableFuture.supplyAsync(() -> new ObjectMapper().createObjectNode().put("errorFile", System.currentTimeMillis()));
}
@Bulkhead(name = "backendA", type = Bulkhead.Type.THREADPOOL, fallbackMethod = "fallbackByThreadPool")
public CompletableFuture<JsonNode> getJsonObjectByThreadPoolWithFallback() throws InterruptedException {
    io.github.resilience4j.bulkhead.ThreadPoolBulkhead.Metrics metrics = threadPoolBulkheadRegistry.bulkhead("backendA").getMetrics();
    logger.info("now i enter the method!!!,{}", metrics);
    Thread.sleep(1000L);
    logger.info("now i exist the method!!!");
    return CompletableFuture.supplyAsync(() -> new ObjectMapper().createObjectNode().put("file", System.currentTimeMillis()));
}

编写测试代码

@Test
public void 多并发接见情形下的ThreadPoolBulkhead测试使用回退方式() {
    CopyOnWriteArrayList<Integer> statusList = new CopyOnWriteArrayList<>();
    IntStream.range(0, 8).forEach(i -> CompletableFuture.runAsync(() -> {
            statusList.add(given().get("/json-object-by-threadpool-with-fallback").statusCode());
        }
    ));
    await().atMost(1, TimeUnit.MINUTES).until(() -> statusList.size() == 8);
    System.out.println(statusList);
    assertThat(statusList.stream().filter(i -> i == 200).count()).isEqualTo(8);
}

由于指定了回退方式,所有请求的响应状态都为正常了。

总结

本文首先简朴先容了Resilience4j的功效及使用场景,然后详细先容了Resilience4j中的Bulkhead。演示了如何在Spring Boot2项目中引入Resilience4j库,使用代码示例演示了如何在Spring Boot2项目中实现Resilience4j中的两种Bulkhead(SemaphoreBulkhead和FixedThreadPoolBulkhead),并编写API测试验证我们的示例。

本文示例代码地址:https://github.com/cg837718548/resilience4j-demo

迎接接见笔者博客:blog.dongxishaonian.tech

关注笔者民众号,推送各种原创/优质技术文章 ⬇️

,

欧博亚洲网址

欢迎进入欧博亚洲网址(Allbet Game):www.aLLbetgame.us,欧博官网是欧博集团的官方网站。欧博官网开放Allbet注册、Allbe代理、Allbet电脑客户端、Allbet手机版下载等业务。

发布评论

分享到:

apple developer enterprise account for rent:除非有突破新闻,否则股市暂时难有一个明确偏向。(资料图片),科技股上升乏力,就连今年强势的美团股价亦回软。(官方微博图片)
1 条回复
  1. 环球UG官网开户网址
    环球UG官网开户网址
    (2020-08-24 00:00:57) 1#

    欧博allbet客户端欢迎进入欧博allbet客户端(Allbet Game):www.aLLbetgame.us,欧博官网是欧博集团的官方网站。欧博官网开放Allbet注册、Allbe代理、Allbet电脑客户端、Allbet手机版下载等业务。心情阳光明媚啊

发表评论

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。