Szhangbiao's blog

记录一些让自己可以回忆的东西

0%

RxJava 实用案例

个人使用RxJava接近十年,是国内比较早的一批把RxJava做为主要技术栈的Android开发者,一些在当时看来比较复杂的业务逻辑,最后都通过RxJava的操作符解决,包括后面开发语言切换到kotlin后使用协程做异步开发,我个人也比较偏向于使用Flow这个跟RxJava很像的数据流式操作框架。现如今开发语言又切回到Java,对Rxjava的使用场景也更丰富了,下面简单介绍一下我用RxJavaAndroid开发中的一些实用案例。

结合 Retrofit

RxJava异步处理 + Retrofit网络请求 + Gson数据解析是Android开发曾经比较流行的开发组合,其中RxJava是可选的也是最难的部分,数据流式的链式调用、简单的异步操作和简洁优雅的代码处理是其优势,同样其操作符的丰富度与复杂度也是门槛之一,熟练地在业务中使用也需要一定的积累。

这里简单介绍一下RxJavaRetrofit网络请求中的使用,代码示例如下:

引入依赖:

1
implementation 'com.squareup.retrofit2:adapter-rxjava3:2.9.0'

构建 Retrofit 对象:

1
2
3
4
5
6
7
8
// CallAdapter<R, ?> callAdapter -> RxJava的观察者类型,支持Observable、Single、Maybe、Flowable和Completable
RxJava2CallAdapterFactory rxJava3CallAdapterFactory = RxJava3CallAdapterFactory.createWithScheduler(Schedulers.io())
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(Constants.BASE_URL)
.client(okHttpClient)
.addCallAdapterFactory(rxJava3CallAdapterFactory)
.addConverterFactory(gsonConverterFactory)
.build();

定义接口:

我们在定义Retrofit支持的网络接口请求类时,请求方法的返回类型直接使用RxJava支持的被观察者类型包裹,这种处理正是上面添加的CallAdapter类支持Call<T>RxJava观察者类型的转换,下面是示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

public interface ApiService {

// 原始的网络请求方法定义
@GET("api/{someApi}")
Call<ApiResponse<T>> getServerData();

@GET("api/someApi")
Single<ApiResponse<T>> getServerData();

@GET("api/someApi")
Observable<ApiResponse<T>> getServerData();

@GET("api/someApi")
Flowable<ApiResponse<T>> getServerData();

@GET("api/someApi")
Maybe<ApiResponse<T>> getServerData();

@GET("api/someApi")
Completable getServerData();

...
}

网络请求方法的返回类型可以根据具体的情况做合适的选择,我目前比较常用的是SingleCompletable这两类。

通用逻辑复用

现在的接口请求返回的数据格式一般都是json并且是restful风格,这样的数据风格非常方便进行统一的封装与业务处理。

统一数据泛型类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ApiResponse<T> {
@SerializedName(value = "code", alternate = {"status"})
private int code;
@SerializedName(value = "message", alternate = {"msg"})
private String message;
private T data;

public ApiResponse() {
}

public ApiResponse(int code, String message, T data) {
this.code = code;
this.message = message;
this.data = data;
}

// getters
...

public boolean isSuccess() {
// 有的是code == 0
return code == 200;
}
}

对于网络请求返回的数据,界面上需要显示的数据都在data字段里,从ApiResponse<T>->T的转换对每一个接口来说是一个重复的过程,利用RxJavaTransformer类可以把转换的逻辑封装起来,下面是示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private <T> ObservableTransformer<ApiResponse<T>,T> observableTransform() {
return new ObservableTransformer<ApiResponse<T>, T>() {
@Override
public @NonNull ObservableSource<T> apply(@NonNull Observable<ApiResponse<T>> upstream) {
// 示例1
return upstream
.flatMap(new Function<ApiResponse<T>, ObservableSource<? extends T>>() {
@Override
public ObservableSource<? extends T> apply(ApiResponse<T> dataWithApiResponse) throws Throwable {
if(dataWithApiResponse.isSuccess())
return Observable.just(dataWithApiResponse.getData());
// 这里体现了RxJava的另一个好处,就是把异常传递下去,然后在onError或RxJavaPlugins.setErrorHandler中统一处理
ApiException customException = ...;//根据业务码code返回不同的自定义异常
return Observable.error(customException);
}
});
// 示例2
return upstream
.map(new Function<ApiResponse<T>, T>() {
@Override
public T apply(ApiResponse<T> dataWithApiResponse) throws Throwable {
if(dataWithApiResponse.isSuccess())
return dataWithApiResponse.getData();
// 这里体现了RxJava的另一个好处,就是把异常传递下去,然后在onError或RxJavaPlugins.setErrorHandler中统一处理
ApiException customException = ...;//根据业务码code返回不同的自定义异常
throw customException;
}
});
}
}
}

// 使用示例
Observable<T> serverData = apiService.getServerData()
.compose(observableTransform());

XXXTransformer类内可以把一些通用的逻辑像网络请求错误重试、Result包裹类转换和Token过期刷新等业务逻辑统一处理,然后使用compose操作符进行业务逻辑复用,这样既可以把复杂的通用逻辑通过compose操作符组合应用,又能让代码更加简洁,逻辑更加清晰。

节流与防抖

节流的定义是在连续高频事件触发时,保证在固定时间间隔内只执行一次操作,常用的案例是View的点击事件在固定时间间隔内防止触发多次。

常规实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 自定义节流点击事件
public abstract class ThrottleClickListener implements View.OnClickListener {
private static final long DEFAULT_THROTTLE_INTERVAL = 500; // 默认节流间隔(毫秒)
private final long throttleInterval; // 节流间隔(毫秒)
private long lastClickTime = 0;

public ThrottleClickListener() {
this(DEFAULT_THROTTLE_INTERVAL);
}

public ThrottleClickListener(long throttleInterval) {
this.throttleInterval = throttleInterval;
}

@Override
public void onClick(View v) {
long currentTime = System.currentTimeMillis();
if (currentTime - lastClickTime > throttleInterval) {
lastClickTime = currentTime;
onThrottleClick(v);
}
// 否则忽略本次点击
}

/**
* 节流后的点击回调,由子类实现
*/
public abstract void onThrottleClick(View v);
}

// 使用示例
button.setOnClickListener(new ThrottleClickListener() { // 使用默认节流间隔
@Override
public void onThrottleClick(View v) {
// 点击事件处理逻辑
doSomething();
}
});

项目里如果不使用RxJava的话,可以通过自定义点击事件的方式实现节流的效果,但是如果使用RxJava的话,可以使用RxBinding配合throttleFirst操作符处理。

RxJava操作符实现:

1
2
3
4
5
6
7
8
private static final long DEFAULT_THROTTLE_INTERVAL = 500; // 默认节流间隔(毫秒)

RxView.clicks(button)
.throttleFirst(DEFAULT_THROTTLE_INTERVAL, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread()) // 第三个参数指定在哪个线程执行
.subscribe(v -> {
// DEFAULT_THROTTLE_INTERVAL间隔内只响应一次点击
doSomething();
});

节流的时间间隔要选取合适的值,时间太长可能造成用户点击没有响应,太短则起不到优化的效果。

防抖的定义是用于控制高频率触发的事件,只在事件停止触发一段时间后才执行一次操作,避免短时间内多次重复执行的操作,常用的案例根据EditText的文本变化事件在固定时间间隔后只触发最后一次事件进行实时搜索操作。

常规实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// 自定义防抖事件
public abstract class DebounceTextWatcher implements TextWatcher {
private static final long DEFAULT_DEBOUNCE_INTERVAL = 500; // 默认防抖间隔(毫秒)
private final long debounceInterval; // 毫秒
private final Handler handler = new Handler(Looper.getMainLooper());
private Runnable workRunnable;

public DebounceTextWatcher() {
this(DEFAULT_DEBOUNCE_INTERVAL);
}

public DebounceTextWatcher(long debounceInterval) {
this.debounceInterval = debounceInterval;
}

@Override
public void afterTextChanged(final Editable s) {
if (workRunnable != null) {
handler.removeCallbacks(workRunnable);
}
workRunnable = new Runnable() {
@Override
public void run() {
onDebounceTextChanged(s.toString());
}
};
handler.postDelayed(workRunnable, debounceInterval);
}

// 可选实现
@Override public void beforeTextChanged(CharSequence s, int start, int count, int after) {}
@Override public void onTextChanged(CharSequence s, int start, int before, int count) {}

public void clear() {
if (workRunnable != null) {
handler.removeCallbacks(workRunnable);
}
}
/**
* 用户停止输入debounceInterval时间后会回调
*/
public abstract void onDebounceTextChanged(String text);
}

// 使用示例
editText.addTextChangedListener(new DebounceTextWatcher() { // 使用默认防抖间隔
@Override
public void onDebounceTextChanged(String text) {
// 因为有延时的处理,这里一定要判断页面是否处于resume状态
// 这里执行防抖后的操作,例如发起搜索请求
searchSomething();
}
});

同样的Rxbinding配合debounce操作符处理也可以达到相同的效果

RxJava 操作符实现

1
2
3
4
5
6
7
8
9

private static final long DEFAULT_DEBOUNCE_INTERVAL = 500; // 默认防抖间隔(毫秒)

RxTextView.afterTextChangeEvents(editText)
.debounce(DEFAULT_DEBOUNCE_INTERVAL, TimeUnit.MILLISECONDS, AndroidSchedulers.mainThread())
.subscribe(v -> {
// 这里执行防抖后的操作,例如发起搜索请求
searchSomething();
});

防抖的时间间隔同样要根据场景设置合适的值,如果是网络请求进行搜索的场景下则需要在Repository层对前一次的搜索请求进行取消,因为网络请求时间的不确定性会造成前一次的搜索结果覆盖后续的搜索结果的情况出现,虽然出现这种情况的概率很低,但也要做一些防范。

使用Rxbinding结合RxJava操作符的方式需要注意的是如果中间操作符的Function参数或者onNext参数方法体里发生了异常,RxJava 会自动将异常传递给subscribeonError参数,没有设置的话则会传递给全局的RxJavaPlugins.onErrorHandler,下面有一个章节专门说RxJava异常方面的细节。

注意无论是RxView.clicks还是RxTextView.afterTextChangeEvents,一定要把在subscribe后返回的Disposable对象管理起来,不然很容易造成内存泄露。

串行/并行请求

图片上传功能涉及到图片本地压缩、使用SDK请求临时Token(可选)、图片上传,其中图片本地压缩和请求临时Token可以并行处理,图片上传则依赖前一步的结果,把前面的并行处理看做一个个体的话,那么这个整体就是一个串行的请求,使用RxJavazipflatMap操作符可以实现这个效果。

伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Observable<File> compressObservable = ...; // 图片压缩Observable
Observable<String> tokenObservable = ...; // 获取Token Observable

Observable.zip(compressObservable, tokenObservable, new BiFunction<File, String, Pair<File, String>>() {
@Override
public Pair<File, String> apply(File file, String token) throws Exception {
return new Pair<>(file, token);
}
})
.flatMap(new Function<Pair<File, String>, ObservableSource<ApiResponse<UploadResult>>() {
@Override
public ObservableSource<ApiResponse<UploadResult>> apply(Pair<File, String> fileTokenPair) throws Exception {
File compressedFile = pair.first;
String token = pair.second;
// 返回图片上传的Observable
return mRepository.uploadFile(compressedFile, token);
}
})
.compose(observableTransform())
// 线程调度
.subscribe(uploadResult -> {
// 上传成功处理
}, throwable -> {
// 错误处理
});

日常的开发过程中这种串行/并行的异步处理是比较常见的,使用RxJava的操作符把多个异步操作连接起来,使其一系列的的操作看上去像一个整体,代码看起来也比较简单整洁,这正是使用RxJava开发所带来的好处。

磁盘/内存缓存

数据缓存是比较常见的业务场景之一,在数据的有效期内对数据进行缓存,可以提高数据的访问效率、降低网络请求的次数,使用RxJavaconcatfirstElement操作符可以实现这个效果,下面是一个简单的例子:

RxJava 伪代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 内存缓存Observable
Observable<Data> mermoryData = Observable.create(new ObservableOnSubscribe<Data>() {
@Override
public void subscribe(ObservableEmitter<Data> emitter) throws Exception {
//1. 获取内存缓存String类型数据
//2. 使用Gson解析数据
//3. 发送数据
emitter.onNext(data);
emitter.onComplete();
}
});
// 磁盘缓存Observable
Observable<Data> diskData = Observable.create(new ObservableOnSubscribe<Data>() {
@Override
public void subscribe(ObservableEmitter<Data> emitter) throws Exception {
//1. 获取磁盘缓存String类型数据
//2. 使用Gson解析数据
//3. 发送数据
emitter.onNext(data);
emitter.onComplete();
}
}).map(data -> {
//1. 使用Gson转换数据为String类型
//2. 保存数据到内存
return data;
});
// 网络请求Observable
Observable<Data> networkData = ...;// 从网络获取数据
.map(data -> {
//1. 使用Gson转换数据为String类型
//2. 保存数据到内存和磁盘
return data;
});

Maybe<Data> maybeData = Observable.concat(mermoryData, diskData, networkData)
.filter(data -> data.isNotEmpty())
.firstElement() // Maybe<Data>
.subscribe();

或者

Observable<Data> data = Observable.concat(mermoryData, diskData, networkData)
.filter(data -> data.isNotEmpty())
.firstElement()// Maybe<Data>
.toObservable()
//如果最后网络数据请求失败或其它原因没有通过filter过滤,那么会抛出NoSuchElementException异常
.subscribe();

内存缓存推荐使用LruCache,为了统一处理所有数据的内存缓存,这里我们使用LruCache<String, String>封装底层数据存取,实际使用过程中需要进行具体类型数据到String数据的序列化与反序列化。磁盘缓存也是同样的处理,只不过磁盘缓存的底层可以使用databasefile的存储方式,无论采用何种方式都要进行一定程度的封装。

轮询请求接口

需求场景:

  • 正常搜索,根据返回结果是否开启深度搜索
  • code0或者200,正常搜索结束,返回搜索结果
  • code为特定业务码,则使用同级别的searchId字段值进行深度(轮询)搜索
  • 轮询间隔为1s,每次接口请求都会返回code决定是否继续轮询。

这个业务场景并不多见,轮询请求接口一般是在等待后端的某个耗时操作处理完成,轮询虽然不是最优的方式,但却是实现成本比较低的方式。

RxJava实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// 首次搜索
Observable<SearchResult> searchObservable = apiService.search(keyword)
.compose(observableTransform());

// 深度(轮询)搜索,轮询间隔为 1s,直到接口返回搜索完成或接口请求出错为止
Observable<SearchResult> deepSearchWithPolling(String searchId) {
return apiservice.deepSearch(searchId)
.compose(observableTransform());
.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Object> upstream) throws Throwable {
return upstream.delay(1, TimeUnit.SECONDS);
// 或者
return upstream
.flatMap(new Function<Object, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Object o) throws Throwable {
// 根据条件来决定是否继续轮询
return needRepeat() ? Observable.timer(1, TimeUnit.SECONDS) : Observable.empty();
}
});
}
})
.takeUntil(new Predicate<SearchResult>() {
@Override
public boolean test(SearchResult searchResult) throws Exception {
return searchResult.isSuccess(); // 根据返回的数据判断师傅要继续轮询
}
})
.lastElement()
.toObservable();
}
// 关联搜索
Disposable disposable = searchObservable
.flatMap(new Function<SearchResult, ObservableSource<SearchResult>>() {
@Override
public ObservableSource<SearchResult> apply(SearchResult searchResult) throws Exception {
if (searchResult.isSuccess()) {
return Observable.just(searchResult);
} else {
return deepSearchWithPolling(searchResult.getSearchId());
}
}
})
// 线程调度
.subscribe();

深度(轮询)搜索停止的条件是needRepeat()返回false或者takeUntil返回trueneedRepeat()方法是根据特定条件判断是否需要继续轮询,takeUntil是根据返回的数据判断是否需要结束轮询。

业务重试

网络错误重试

需求场景:

  • 在弱网或者高并发的场景下请求接口容易发生Timeout等异常,需要进行网络请求重试
  • 重试有次数限制,且每次重试的间隔时间随次数递增,重试次数跟相应的重试间隔时间可配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//重试次数是retryConditions的长度,重试间隔时间是retryConditions的值
public static <T> ObservableTransformer<T, T> observableApiRetry(List<Long> retryConditions) { // Arrays.asList(1, 2, 4, 8)
return new ObservableTransformer<T, T>() {
@Override
public @NonNull ObservableSource<T> apply(@NonNull Observable<T> upstream) {
return upstream
//retryWhen有条件的进行重试,根据 apply 方法返回的Observable类型决定是否自动重新订阅上游Observable
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Throwable {
return throwableObservable
.zipWith(retryConditions, new BiFunction<Throwable, Long, Pair<Throwable, Long>>() {
@Override
public Pair<Throwable, Long> apply(Throwable throwable, Long aLong) throws Throwable {
return new Pair<>(throwable, aLong);
}
})
.flatMap(new Function<Pair<Throwable, Long>, ObservableSource<Long>>() {
@Override
public ObservableSource<Long> apply(Pair<Throwable, Long> throwableLongPair) throws Throwable {
Throwable throwable = throwableLongPair.first;
long time = throwableLongPair.second;
if (needRetry(throwable)) {
// condition ? 返回Observable.error,流终止,不再重试 : 返回一个延迟Observable,表示需要重试
return time == retryConditions.get(retryConditions.size() - 1) ? Observable.error(new RetryFailException(throwable)) : Observable.timer(time, TimeUnit.SECONDS);
}
// 返回Observable.error,流终止,不再重试
return Observable.error(throwable);
}
});
}
});
}
};
}

Token过期刷新

需求场景:

  • 用户登录成功后返回refreshTokenaccessTokenrefreshToken过期时间较长一般以年为单位,accessToken过期时间较短一般以分钟为单位
  • accessToken用于一般的接口请求时放到请求头里用于后端鉴权使用,refreshToken用于accessToken过期后调用刷新接口获取新的accessToken使用
  • accessToken过期时在用户层面是无感知的,需要在逻辑层自动做刷新然后业务接口重试,refreshToken过期需要弹出重新登录确认框。

自动刷新accessToken后并重试网络请求的需求,这里实现方式有两种,分别是在OkHttp的自定义Interceptor里处理和RxJavaretryWhen操作符配合处理

自定义Interceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class TokenAuthenticatorInterceptor implements Interceptor {

private static final int HTTP_EXPIRED_CODE = ...;// http 状态码,代表 token 过期
private static final int TOKEN_EXPIRED_CODE = ...;// token 过期错误码

private final TokenManager tokenManager;

public TokenAuthenticatorInterceptor(TokenManager tokenManager) {
this.tokenManager = tokenManager;
}

@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
// 1. 给请求加上当前 AccessToken
Request authRequest = request.newBuilder()
.header("Authorization", "Bearer " + tokenManager.getAccessToken())
.build();
Response response = chain.proceed(authRequest);
// 2. 检查是否token过期(如 401 未授权)
if (isTokenExpired(response)) {
response.close(); // 关闭前一个响应
synchronized (TokenManager.class) {
return retryWithRefreshedToken(chain, request);
}
}
return response;
}

private boolean isTokenExpired(Response response) {
if (response.code() == 200) {
return false;
}
// restful风格的json数据
int code = ...//读取body里的code字段
return response.code() == HTTP_EXPIRED_CODE && code == TOKEN_EXPIRED_CODE;
}

private Response retryWithRefreshedToken(Chain chain, Request request) throws IOException {
// double check,避免多线程重复刷新
String newToken = tokenManager.getAccessToken();
if (newToken.equals(tokenManager.getLastTriedToken())) {
// 触发刷新
newToken = tokenManager.refreshTokenBlocking();
}
tokenManager.setLastTriedToken(newToken);
// 3. 用新 token 重试一次
Request newRequest = request.newBuilder()
.header("Authorization", "Bearer " + newToken)
.build();
return chain.proceed(newRequest);
}
}

注意事项:

  • 刷新token要同步阻塞,否则多个线程会重复刷新(用synchronized保证)。
  • 只重试一次,避免陷入死循环,若刷新失败,应返回失败,不要死循环。
  • 刷新token的网络请求不要用当前带InterceptorOkHttpClient,避免递归调用。

retryWhen操作符:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

Observable<Data> requestObservable = apiService.getServerData()
.compose(observableTransform());

Observable<Data> refreshObservable = apiService.refreshToken()
.compose(observableTransform())
// 该接口也可以根据情况进行重试
.compose(observableApiRetry(Arrays.asList(1,2)))
.doOnNext(new Consumer<Token> (){
@Override
public void accept(Token token) throws Exception {
tokenManager.setAccessToken(token.getAccessToken());
}
})
.onErrorResumeNext(new Function<Throwable, ObservableSource<Data>>() {
@Override
public ObservableSource<Data> apply(Throwable throwable) throws Exception {
//在这里可以根据具体异常做一些处理,一般都是返回一个Observable.error终止retryWhen的重试操作
return Observable.error(throwable);
}
});

requestObservable
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable
.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
// TokenExpiredException 如果是http状态码401则需要在自定义Interceptor里抛出该异常,如果是Token过期业务码则由ApiResponse<T> -> T 转换时传递出该异常
if(throwable instanceof TokenExpiredException) {
return refreshObservable;
}
return Observable.error(throwable);
}
});
}
});

这两种方式都可以实现token过期自动刷新,但是在全局高并发的场景下并不能保证刷新Token操作同一时间只执行一次,需要做一些额外的处理,基本思想就是把refreshToken方法进行封装,然后用锁(synchronized/Mutex)保护刷新的唯一性

统一错误处理

一些用户体验比较友好的应用会把应用使用过程中出现的错误体现在应用交互上,比如ToastDialog弹框或者特别的错误界面,这里的错误在数据层看来是发生了异常且大部分场景是在跟后台的交互过程中出现的,这时候需要做一些错误的统一处理,包括200的网络状态码网络请求失败服务器特定错误码数据解析错误本地数据库异常自定义业务异常等等,使用RxJava的另一个好处就是对异常的处理可以更加灵活,可以根据不同的情况做一些不同的处理,我做了如下举例:

  • 在自定义Interceptor里根据Http状态码或者json数据里的业务码throw自定义异常
  • 在创建Observable的方法体里使用emitter.onError向下游传递异常
  • ApiResponse<T> -> T转换时根据json数据里的业务码传递或者throw自定义异常
  • 针对个别Observable使用onErrorXXX系列接口处理异常
  • Observer实现onError处理异常,比如在ViewModel里使用LiveDataUI层传递显示对应状态的数据
  • Observer未实现onError,或者使用RxJavaPlugins.onError(throwable)都会向全局的RxJavaPlugins.onErrorHandler传递异常

了解这些异常的发送、传递和处理的场景后我们接下来实现一个refreshToken过期后弹出重新登录弹框的案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 在自定义`Interceptor`里抛出异常:
private static final int USER_EXPIRED_CODE = ...;// 用户登录过期错误码

@Override
public Response intercept(Interceptor.Chain chain) throws IOException {
Response response = chain.proceed(chain.request());
if (response.code() == 401) {
int code = ...//读取body里的code字段
if(code == USER_EXPIRED_CODE) {
throw new LoginExpiredException();
}
}
return response;
}

// 或者 ApiResposne<T> -> T 转换时识别并传递该异常
new Transformer<ApiResponse<T>, T>() {
@Override
public Observable<T> call(Observable<ApiResponse<T>> observable) {
return observable.flatMap(new Function<ApiResponse<T>, Observable<T>>() {
@Override
public Observable<T> apply(ApiResponse<T> apiResponse) throws Exception {
if (apiResponse.getCode() == USER_EXPIRED_CODE) {
return Observable.error(new LoginExpiredException());
}
return Observable.just(apiResponse.getData());
}
});
}
}

在封装的Observer中统一处理,或者Result包裹类转换时处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 定义Result类
public class Result<T> {
@NonNull
public final Status status;
@Nullable
public final T data;
@Nullable
public final Error error;

private Result(@NonNull Status status, @Nullable T data, @Nullable Error error) {
this.status = status;
this.data = data;
this.error = error;
}

public boolean isSuccess() {
return status == Status.SUCCESS;
}

public static <T> Result<T> success(@NonNull T data) {
return new Result<>(Status.SUCCESS, data, null);
}

public static <T> Result<T> error(Error error) {
return new Result<>(Status.ERROR, null, error);
}
}

Transformer中处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
new ObservableTransformer<T, Result<T>>() {
@Override
public Observable<Result<T>> call(Observable<T> observable) {
return observable
.map(new Function<T, Result<T>>() {
@Override
public Result<T> apply(T t) throws Exception {
return Result.success(t);
}
})
.onErrorReturn(new Function<Throwable, Result<T>>() {
@Override
public Result<T> apply(Throwable throwable) throws Exception {
if(throwable instanceof LoginExpiredException) {
RxJavaPlugins.onError(throwable);
}
return Result.error(throwableMappingToError(throwable));
}
});
}
}

Application中设置全局setErrorHandler

1
2
3
4
5
6
7
8
RxJavaPlugins.setErrorHandler(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
if (throwable instanceof LoginExpiredException) {
// 弹出重新登录弹框, 同一时间可能会有多个异常到来,需要做防抖/节流处理和线程切换
}
}
});

总结

ReactiveX函数式编程是一种思想,RxJava只是这种思想在Java语言方面的实现,其它语言上也有体现像 RxSwiftRxJs RxDart等,想要掌握RxJava除了熟悉它的基本概念、常用操作符以及这些操作符的组合外,更要了解它的编程思想,这也是我对RxJava接下来的学习的主要目标。
RxJava用的好当然就是神兵利器,效率和简洁代码上都会有很大的提升,但相对的缺点是入门比较难,使用不当会引发一些比较严重的问题,如果链式调用比较长同样也会增加出现问题后的排查难度,在使用的过程中要有意识的去规避一些潜在的问题。