[响应式编程] 如何优雅Exception异常处理

初识响应式编程的时候 , 除了从命令式的思维方式转变为函数式的编程方式外,其中有一个很大的不适应的地方就是在面对异常时该怎么处理 , 尤其是面对检查异常。其原因在于,当出现异常时,会先cancel掉原先的数据流 , 再调用onError处理(可以参见前面FluxMap的源码) 。所以,示例中,"Finally: [cancel]"会先被打印,然后才是onErrorReturn的执行,即进入onError阶段 。那为什么第二个doFinally虽然出现在onErrorReturn之前 , 但又是最后执行的呢?这是因为在实现doFinally的时候,先调用了下游的onError方法,再执行自身doFinally的方法,参见FluxDoFinally的实现:
public void onError(Throwable t) {try {actual.onError(t);}finally {runFinally(SignalType.ON_ERROR);}}
这样就能符合try-catch-finally的执行顺序了 。所以,doFinally出现的位置很重要,若出现在异常前面,就会优先执行(不会像finally那样最后执行) , 若出现在异常后面,则会最后执行(类似finally) 。
4. try-with-resource
对于try-with-resource,reactor也给了替代的实现,那就是using操作符:
public static <T, D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extendsPublisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup, boolean eager) {return onAssembly(new FluxUsing<>(resourceSupplier,sourceSupplier,resourceCleanup,eager));}
其中resourceSupplier是创建生成资源的函数,sourceSupplier则是针对生成的resource进行操作并产生数据流 , resourceCleanup则是在结束后(不管成功还是失败)进行资源的释放 。以try-with-resource为例:
try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) {return disposableInstance.toString();}
利用using函数,则可以写成:
Flux.using(() -> new SomeAutoCloseable(),disposableInstance -> Flux.just(disposableInstance.toString()),AutoCloseable::close );5. 重试 – retry / retryWhen
除了以上方式处理异常时,还有一种常见的方式就是重试 。比如,我们调用某个接口超时时,通常会重试一次,这个时候可以使用retry方法,如:
Flux.interval(Duration.ofMillis(250)).map(input -> {if (input < 3) return "tick " + input;throw new RuntimeException("boom");}).retry(1).subscribe(System.out::println, System.err::println);
会对Flux流执行两次,其结果是:0 1 2 0 1 2,即当遇到data为3时,会重试一次 。其基本思想很简单,就是拦截onError流程 , 计算重试的次数,如果重试未超过,则重新订阅:
public void onError(Throwable t) {long r = remaining;if (r != Long.MAX_VALUE) {if (r == 0) {actual.onError(t);return;}remaining = r – 1;}resubscribe();}
这里的remaining就是可以重试的次数,直到重试为0,再一次进入actual.onError 。重新订阅的方法也很简单,就是把上游的source与下游的actual,再来一次subscribe:source.subscribe(actual) 。
除了retry外,还有一个高级版本retryWhen,它除了能像retry那样重试固定的次数外,还能支持各种重试策略,由于retryWhen的源码相对复杂,这里不再叙述(毕竟本文不是源码解读),但除了重试策略有区别外,其重试的机制还是一样的,把上游与下游重新订阅 。
6. 检查异常处理
在java中有一类异常是需要显示进行处理的,那就是检查异常(Checked Exception),如IOException 。在命令式编程中 , 可以通过throws关键字来声明,从而可以把异常往外抛,而不需要立即处理 。然而 , 遗憾的是,在reactor中 , 并没有类似的平替 , 不管任何情况,当遇到检查异常,reactor中都需要用try-catch来处理,这是唯一一个在reactor中没有找到命令式编程中的平替 。与命令式编程有throws关键字声明不同,reactor中处理检查异常都必须用try-catch来处理,处理的方式有以下三种:
捕获到异常并从中恢复 。序列继续正常的进行 。捕获异常,将其封装成一个 不检查 的异常,然后将其抛出(中断序列) 。如果你需要返回一个 Flux(例如 , 在 flatMap 中),那么就用一个产生错误的 Flux 来封装异常,如下所示:return Flux.error(checkedException) 。(这个序列也会终止 。)
这三种方式中,其中最常见也最常用的方式就是第二种,将检查异常转化为非检查异常,如throw new RuntimeException(e) 。但是reactor提供了辅助工具类Exceptions,进而可以相对优雅简洁的进行统一处理 。如以下这个例子
public String convert(int i) throws IOException {if (i > 3) {throw new IOException("boom " + i);}return "OK " + i;}Flux<String> converted = Flux.range(1, 10).map(i -> {try { return convert(i); }catch (IOException e) { throw Exceptions.propagate(e); }});converted.subscribe(v -> System.out.println("RECEIVED: " + v),e -> {if (Exceptions.unwrap(e) instanceof IOException) {System.out.println("Something bad happened with I/O");} else {System.out.println("Something bad happened");}});
由于convert声明了检查异常IOException,所以必须要try-catch?。倮肊xceptions.propagate来封装为非检查异常 。相比于直接用throw new RuntimeException(e),利用Exceptions的好处在onError处理阶段可以用Exceptions.unwrap()方法来获取内部真实抛出的异常,体现了利用工具类的好处——简洁明了 。
总结
【[响应式编程] 如何优雅Exception异常处理】以上就是朝夕生活(www.30zx.com)关于“[响应式编程] 如何优雅Exception异常处理”的详细内容,希望对大家有所帮助!

猜你喜欢