摘要
在上一篇文章《并发编排与响应式初步》,我们已经对CompletableFuture
的功能进行了深入的探讨,并简要介绍了Reactor
响应式流相关的概念。然而,要熟练掌握异步任务编排并非一朝一夕之事,尤其是在需要处理 I/O 密集型应用或者一些特殊场景,如:任务间无顺序依赖关系,或者需要在所有任务完成后一次性处理所有返回结果。
在本篇文章中,我们将以后端异步获取和风天气 API 的例子来详细展示CompletableFuture
和Reactor
的异步编排任务如何在实战中应用。通过本章的学习,我希望读者能够对异步编排任务有更深的理解,以及对于编写异步任务时需要考虑的问题有更清晰的认识。
需求分析
任何具备目的性的开发都需要一个完备的需求分析报告,我们的目的是利用和风天气API为后端实现一个获取访问者所在地的天气状况。那么可以将这个过程大致拆分成如下的几个步骤:
graph LR
A[拉取 GeoLite2] --> B[获取请求IP]
B --> C[查询GeoLite2经纬度]
C --> D[请求和风API获取城市ID]
D --> E[使用城市ID请求天气信息]
E --> F[返回城市天气和名称]
整个过程看似简单,并且这些操作也不能被视为“传统意义上”的I/O密集型任务(传统意义的IO密集型任务一般用于描述频繁读写磁盘或进行网络数据传输的任务),那么未接触过异步任务开发的开发者会想当然地使用同步阻塞的方法来实现它。但是,我们要知道这整个任务的过程是不开一定量的网络I/O的,这也是整个过程中耗时最多的部分,因为我们无法估量网络波动或任务阻塞对主线程任务带来的严重影响和后果,使用异步编排来实现这个功能也显得尤为重要。尽管它并不算严格的I/O密集型任务,但使用异步编程实现仍然能收获颇多的优势:
- 提高资源利用率:如之前的文章所述,异步编程可以避免CPU在等待I/O操作完成时处于空闲状态,使得CPU可以用来处理其他任务。
- 提高响应速度:在这个过程中,考虑到需要同时处理多个用户的请求,使用异步编程,可以并发处理多个请求,提高整体系统的响应性。
- 异常问题处理:在异步编程中,我们可以更好地处理异常问题。在同步代码中遇到异常,通常需要阻塞整个过程。但在异步代码中,开发者可以只在出现异常的部分进行预处理,并在异步任务的最后做统一处理,其余的任务仍然可以继续正常执行。
- 提高可扩展性:异步编程可以更好地支持大量并发的用户请求,对于高并发的系统,响应式流等异步框架可以提供更方便的编排过程与任务拼接。
正式开发
远程拉取GeoLite2
阅读和风天气的城市实时天气API文档内容我们可以知道,要想获取用户所在城市的天气则需要先获取该城市ID。而城市ID的获取可以通过传递经纬网信息给和风GeoAPI来获取,那么我们的任务在于只要获取用户IP地址所在的大致经纬网信息即可。
使用GeoLite2.mmdb
数据库是一个不错的选择,它可以帮助我们直接分析出用户IP地址所在的经度和纬度。而我们所需要做的第一步就是就是在应用启动时从https://cdn.jsdelivr.net/npm/geolite2-city@1.0.0/GeoLite2-City.mmdb.gz
上将这个文件拉取下来,拉取成功后进行解压操作,最后将压缩包删除。我们将这些方法封装到FutureDownloader
类中,并先定义下载方法和一些内部属性。
构建下载方法
@Slf4j
public class FutureDownloader {
private final String FILE_URL; // 将文件下载到该位置
private final String DIRECTION; // 目标下载地址
private FutureDownloader(String FILE_URL, String DIRECTION) {
this.FILE_URL = FILE_URL;
this.DIRECTION = DIRECTION;
}
// 远程异步拉取mmdb数据库到本地
private String downloadFile(String filePath) {
HttpClient client = HttpClient.newBuilder()
.followRedirects(HttpClient.Redirect.ALWAYS) // 允许重定向
.build();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(FILE_URL))
.timeout(Duration.ofMinutes(1)) // Http请求超时时间
.GET()
.build();
Path file = Path.of(filePath);
try {
// 第一次拉取资源
log.info("start downloading gz file GeoLite2-City.mmdb.gz from server");
client.sendAsync(request, HttpResponse.BodyHandlers.ofFile(file))
.orTimeout(2, TimeUnit.MINUTES)
.exceptionally(ex -> {
throw new CompletionException("Failed to download file.", ex);
}).join();
} catch (CompletionException ex) {
// 第一次拉取失败的重试策略
for (int i = 0; true; i++) {
try {
log.info("download failed, retrying for the {} time.", (i+1));
client.sendAsync(request, HttpResponse.BodyHandlers.ofFile(file))
.orTimeout(2, TimeUnit.MINUTES)
.join();
break;
} catch (CompletionException retryEx) {
if (i == 2) { // 重试3次失败抛出异常
throw new CompletionException("Failed to download file after 3 attempts.", retryEx);
}
}
}
}
return filePath;
}
}
简单地看一下这个方法做了什么:
- 在这个方法中通过
HttpClient
类来实现一个CompletableFuture
的异步任务链路,以sendAsync()
方法为开头定义相关的超时策略为抛出一个CompletionException
异常给最外层的try catch
。 - 当最外层的
try catch
接受到异常后会开始最多3次的重试下载,并在全部失败后将CompletionException
异常抛出到异步任务外。 - 当下载成功后则将存储路径
filePath
以字符串的形式返回到异步任务外。
解压文件
我们希望在这个任务完成后再对下载下来的文件进行解压处理,因而需要设计一个专门的解压文件方法:
private String unzipFile(String filePath, String unzipPath) {
// try-with-resource打开一个IO流
try (GZIPInputStream gzipInputStream = new GZIPInputStream(new FileInputStream(filePath));
FileChannel outputChannel = FileChannel.open(Path.of(unzipPath), StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
log.info("gz file GeoLite2-City.mmdb.gz has been successfully downloaded, start unpacking");
byte[] buffer = new byte[1024];
ByteBuffer byteBuffer = ByteBuffer.wrap(buffer);
int length;
while ((length = gzipInputStream.read(buffer)) != -1) {
byteBuffer.clear();
byteBuffer.limit(length);
outputChannel.write(byteBuffer);
}
} catch (IOException e) {
throw new CompletionException("Unable to unzip file", e);
}
return unzipPath;
}
解压文件的过程是比较简单且容易理解的,同样的它也负责了一个I/O操作,并在失败时抛出了一个CompletionException
异常,在成功时返回了解压文件所在的位置。
构建完成的异步任务
现在我们已经构建好了downloadFile
远程下载任务和unzipFile
解压文件任务,并且也处理好了在发生错误时抛出一个CompletionException
类型的异常。所以最后需要做的就是通过CompletableFuture
将这两个任务结合起来,并在它们都完成后统一处理由这两个方法返回的最终结果或是异常。
// 暴露给外部调用
public static void call(String direction, String requestUrl) {
FutureDownloader futureDownloader = new FutureDownloader(requestUrl, direction);
futureDownloader.downloadAndUnzipFileAsync();
}
// 总任务流
private void downloadAndUnzipFileAsync() {
String filePath = DIRECTION + File.separator + "GeoLite2-City.mmdb.gz";
String unzipPath = DIRECTION + File.separator + "GeoLite2-City.mmdb";
// 检查是否已经存在了该文件
if (Files.exists(Path.of(unzipPath))) {
log.info("detected file GeoLite2-City.mmdb at {}", unzipPath);
return;
}
CompletableFuture.supplyAsync(() -> downloadFile(filePath))
.thenApply(path -> unzipFile(filePath, unzipPath))
.whenComplete((unzippedPath, ex) -> {
if (ex != null) {
log.error(ex.getMessage());
} else {
try {
log.info("deleting gz file GeoLite2-City.mmdb.gz");
Files.delete(Path.of(filePath));
} catch (IOException e) {
log.error("Error when deleting file: {}", e.getMessage());
}
}
});
}
在downloadAndUnzipFileAsync
方法中,使用了CompletableFuture.supplyAsync().thenApply().whenComplete()
这个链路来编排和处理异步任务,整个结构是非常清晰而直观的。
此外,在Spring应用中,我们通常会将这个类通过@Configuration
修饰并让FutureDownloader
类通过@Bean
修饰的方法将类注入到IoC容器中,确保全局只有一份FutureDownloader
,同时使用@PostConstruct
实现自动调用downloadAndUnzipFileAsync
方法。
解析地址并请求城市ID
在得到了GeoLite2.mmdb
数据库后便可以通过其伴生的依赖来分析IP地址所在的经纬度信息,在项目中添加geoip2
依赖:
implementation 'com.maxmind.geoip2:geoip2:4.0.1'
我们创建GeoLiteReader
类来实现IP地址的解析以及城市API获取的异步方法,其大致的流程可以被拆分为如下三步:
graph LR
A[获取IP] --> B[调用GeoLite2查询经纬度]
B --> C[向和风GeoAPI获取城市ID]
请求城市ID
先封装最基本的“请求和风GeoAPI”的方法getCityIdFromLatLon
,这个方法中传入经度、纬度以及和风天气应用的私钥:
/**
* 依据传入的IP经纬度请求和风天气API获取城市ID
* @param latitude 纬度
* @param longitude 经度
* @return String类型的响应式体
*/
private Mono<CityInfo> getCityIdFromLatLon(String latitude, String longitude, final String key) {
HttpClient client = HttpClient.newHttpClient();
// URI构建请求体
URI uri = URI.create(String.format("https://geoapi.qweather.com/v2/city/lookup?location=%s&key=%s",
URLEncoder.encode(longitude + "," + latitude, StandardCharsets.UTF_8),
URLEncoder.encode(key, StandardCharsets.UTF_8)));
HttpRequest request = HttpRequest
.newBuilder()
.uri(uri)
.GET()
.build();
// GZIP解压缩
return GzipResponse.handle(client, request)
.flatMap(this::parseCityId);
}
/**
* Gson处理异步过来的HttpResponse
* @param responseBody 经过GZIP解压缩的JSON格式的HttpResponse字符串
* @return 返回一个从HttpResponse中获取城市ID的响应式体
*/
private Mono<CityInfo> parseCityId(String responseBody) {
return Mono.fromCallable(() -> {
Gson gson = new Gson();
WeatherResponse response = gson.fromJson(responseBody, WeatherResponse.class);
WeatherResponse.Location location = response.getLocation().get(0);
return new CityInfo(location.getId(), location.getName());
});
}
在和风API文档中着重描述了“返回数据是JSON格式并进行了Gzip压缩,数据类型均为字符串”,所以我们需要通过GzipResponse
类来实现解压缩,并将解压缩后的字符串通过Gson
工具与WeatherResponse
类形成映射最后以CityInfo
的record类的Mono
形式进行返回。
@Getter
@Setter
public class WeatherResponse {
@SerializedName("location")
private List<Location> location;
@Getter
@Setter
public static class Location {
@SerializedName("id")
private String id;
@SerializedName("name")
private String name;
}
}
public record CityInfo(String cityId, String cityName) {}
解析地址
至此我们已经封装了能通过和风GeoAPI得到的Mono<CityInfo>
响应式对象了,我们还需要一个异步的从GeoLite2.mmmdb
数据库中获取经纬度的方法来间接调用getCityIdFromLatLon
:
@Slf4j
@Component
public class GeoLiteReader {
@Resource
private ToolBenchPlugin PLUGIN;
public Mono<CityInfo> getCityCode(String ip, final String key) {
File database = new File(PLUGIN.getConfigContext().getCONFIG_HOME() + File.separator + "GeoLite2-City.mmdb");
try {
// build方法应该配合try-with-resource但是会被自动关闭链接导致整个Mono无法再从数据库获取数据
DatabaseReader dbReader = new DatabaseReader.Builder(database).build();
// getByName()请求DNS会在该异步模型中发生阻塞
InetAddress ipAddress = InetAddress.getByName(ip);
return Mono.fromCallable(() -> dbReader.city(ipAddress))
.flatMap(response -> getCityIdFromLatLon(
response.getLocation().getLatitude().toString(),
response.getLocation().getLongitude().toString(),
key
));
} catch (IOException e) {
return Mono.error(e);
}
}
}
在这个方法中会先到GeoLite2.mmdb
文件中开启一个流,然后根据传递过来的IP获取到所在的经纬网信息,最后通过一个响应式流Mono.fromCallable().flatMap()
处理成一个Mono<CityInfo>
结果。
在Mono.fromCallable()
中定义了一个从dbReader
获取城市ID的任务,它返回了一个Mono<CityResponse>
响应体对象。在随后的拼接的异步任务flatMap
中将响应体数据展开,分发给getCityIdFromLatLon
方法处理,进而从和风GeoAPI中得到我们最终想要的结果。
在这个方法中留了两个比较常见的异步编程问题,让我们来看一下:
-
DatabaseReader dbReader = new DatabaseReader.Builder(database).build();
应该配合try-with-resource
实现,但是try-with-resource
会自动关闭该连接导致在异步模型中连接不再可用。在这种情况下,我们需要自行管理DatabaseReader
的生命周期,我们需要在flatMap
后追加关闭连接的处理:public Mono<CityInfo> getCityCode(String ip, final String key) { // ... return Mono.fromCallable(/* ... */) .flatMap(/* ... */) // 处理并关闭连接资源 .doOnTerminate(() -> safeClose(dbReader)); // ... } private void safeClose(Closeable closeable) { try { closeable.close(); } catch (IOException e) { throw new RuntimeException(e); } }
-
InetAddress ipAddress = InetAddress.getByName(ip);
是一个阻塞的方法,在响应式异步编程中是不应该出现同步阻塞方法块的。为了解决这一问题我们需要先知道CompletableFuture
和Mono
的区别:- 在上一期中我们已经讲过了,
CompletableFuture
会在一个单独的线程中执行该操作,其主线程可以继续执行其他操作。 Mono
本身并不能处理阻塞操作,在Reactor中,Mono
是用于处理异步操作的,但是它不会自己启动新的线程来处理阻塞操作。所以fromCallable
也是在主线程中执行任务发生阻塞。
要解决这个问题我们就有两种思路,一种是将该阻塞任务委派给
CompletableFuture
创建一个新的线程来执行,另一种是通过Mono
的上下文中使用调度器让任务在Reactor提供的弹性线程池上执行从而不阻塞事件。- 通过
CompletableFuture
转换为非阻塞任务来解决问题:
public Mono<CityInfo> getCityCode(String ip, final String key) { File database = new File(PLUGIN.getConfigContext().getCONFIG_HOME() + File.separator + "GeoLite2-City.mmdb"); try { DatabaseReader dbReader = new DatabaseReader.Builder(database).build(); return getIpAddress(ip) .flatMap(ipAddress -> getCityResponse(dbReader, ipAddress)) .flatMap(response -> getCityIdFromLatLon( response.getLocation().getLatitude().toString(), response.getLocation().getLongitude().toString(), key )) .doOnTerminate(() -> safeClose(dbReader)); } catch (IOException e) { return Mono.error(e); } } private Mono<InetAddress> getIpAddress(String ip) { return Mono.fromFuture(CompletableFuture.supplyAsync(() -> { try { return InetAddress.getByName(ip); } catch (UnknownHostException e) { throw new CompletionException(e); } })); } private Mono<CityResponse> getCityResponse(DatabaseReader dbReader, InetAddress ipAddress) { return Mono.fromCallable(() -> dbReader.city(ipAddress)); }
- 通过
Mono
的调度器来解决阻塞问题:
public Mono<CityInfo> getCityCode(String ip, final String key) { File database = new File(PLUGIN.getConfigContext().getCONFIG_HOME() + File.separator + "GeoLite2-City.mmdb"); try { DatabaseReader dbReader = new DatabaseReader.Builder(database).build(); return Mono.fromCallable(() -> InetAddress.getByName(ip)) .subscribeOn(Schedulers.boundedElastic()) // 启用Reactor的调度器 .flatMap(ipAddress -> Mono.fromCallable(() -> dbReader.city(ipAddress))) .flatMap(response -> getCityIdFromLatLon( response.getLocation().getLatitude().toString(), response.getLocation().getLongitude().toString(), key )) .doOnTerminate(() -> safeClose(dbReader)); } catch (IOException e) { return Mono.error(e); } }
- 在上一期中我们已经讲过了,
请求天气数据
现在我们已经拥有了请求城市天气所需要的城市ID了,我们继续编排一个新的请求城市天气的任务:
private Mono<JsonNode> getWeatherData(String cityCode, String key) {
HttpClient client = HttpClient.newHttpClient();
URI uri = URI.create(String.format("https://devapi.qweather.com/v7/weather/now?location=%s&key=%s",
cityCode,
URLEncoder.encode(key, StandardCharsets.UTF_8)));
HttpRequest request = HttpRequest
.newBuilder()
.uri(uri)
.GET()
.build();
// 处理成json对象返回给前端
return GzipResponse.handle(client, request)
.flatMap(responseBody -> {
try {
ObjectMapper mapper = new ObjectMapper();
return Mono.just(mapper.readTree(responseBody));
} catch (JsonProcessingException e) {
return Mono.error(e);
}
});
}
熟悉的面孔,熟悉的配方,依旧是之前的套路:构建HttpClient
请求体,将得到的结果进行GZIP解压缩,最后将Json文本内容以Mono
的形式返回。
需要注意的是,在这个方法中使用了Jackson的ObjectMapper
而不再是Gson
对象了。这是外部框架(非Spring)问题所致的,但是我们仍然从以下几个角度可以思考为什么使用Gson
会在外部框架中出现问题:
- 功能:Jackson提供了更多的低级别控制,这可能在处理特定的JSON结构时更有用。如,Jackson支持更复杂的对象映射、支持更多的注解等。
- 兼容性:如果项目中其他部分已经使用了Jackson,那么在这里使用
ObjectMapper
可能是为了保持一致性,避免在项目中引入不必要的复杂性。
创建接口实现调用
在构建最后的接口前,再来梳理一下这整个接口请求任务的编排顺序:
graph TD
A[用户访问接口请求QWeatherRouter.getCurrentCityWeather]
A --> B[接口将IP传递给GeoLiteReader.getCityCode方法]
B --> C[GeoLiteReader.getCityCode方法从GeoLite2.mmdb数据库读取经纬网信息]
C --> D[将经纬度传递给GeoLiteReader.getCityIdFromLatLon方法请求和风GeoAPI获取城市ID]
D --> E[将城市ID传递给QWeatherRouter.getWeatherData方法获取天气]
E --> F[QWeatherRouter.getCurrentCityWeather接口通过Jackson封装信息返回给前端]
假设我们的程序最终是部署在Docker容器中的,那么我们在获取访问者的真实IP时需要借助自定义的请求头来实现,在这个方法中我们使用Optional
加响应式流来编排最后的总任务:
@Slf4j
@RequestMapping("/weather")
@RestController
@AllArgsConstructor
public class QWeatherRouter {
@Resource
private GeoLiteReader reader; // 自动注入CityID读取的Bean对象
private final ReactiveSettingFetcher reactiveSettingFetcher;
@GetMapping("/get")
public Mono<JsonNode> getCurrentCityWeather(ServerWebExchange exchange) {
HttpHeaders headers = exchange.getRequest().getHeaders();
String clientIp = Optional.ofNullable(headers.getFirst("X-Forwarded-For"))
.orElseGet(() -> Optional.ofNullable(headers.getFirst("X-Real-IP"))
.orElseGet(() -> {
InetSocketAddress remoteAddress = exchange.getRequest().getRemoteAddress();
return (remoteAddress != null) ? remoteAddress.getAddress().getHostAddress() : null;
}));
return Optional.ofNullable(clientIp)
.map(ip -> this.reactiveSettingFetcher.get("basic")
.map(setting -> setting.get("qWeatherPrivateKey").asText(""))
.flatMap(key -> { // 从配置中读取私钥
if (key.isEmpty()) return Mono.empty();
// 从reader这个Bean中调取异步任务
return reader.getCityCode(ip, key)
// 将结果传递给getWeatherData任务
.flatMap(cityInfo -> getWeatherData(cityInfo.cityId(), key)
.map(weatherData -> {
ObjectMapper mapper = new ObjectMapper();
ObjectNode result = mapper.createObjectNode();
result.put("cityName", cityInfo.cityName());
result.set("weatherData", weatherData);
return (JsonNode)result;
}));
})
)
.orElse(Mono.empty());
}
}
整个异步任务被缜密编排、层层递进,最终获取到用户所在地的天气信息。在方法中取用了X-Forwarded-For
和X-Real-IP
请求头,在Nginx的反向代理中应该配置它们的来源:
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
问题思考
获取用户当地的和风天气信息的整个编排过程看似非常复杂,或许也有很多比较难以理解的地方,但是一旦理解了异步编排的精髓这些问题都能迎刃而解。除此之外还有很多值得读者思考的地方:
- 和风天气API免费的是每天提供1000次查询、付费的则是按量计费,上文所设计的异步任务每次都会扣取2次token,无论是哪个对于网页的管理者来说都需要对前端做限流策略,避免恶意刷取流量导致扣费。通常限流有以下一些策略:
- 对于同源IP,将IP对应的
CityID
、天气信息一并存入Redis缓存中过期时间设置为1小时,每次访问都从缓存中查询,如果没有命中再从和风天气中查询。 - 对于不同源IP,考虑到从GeoLite2.mmdb中查询是非常快的那么可以将这个城市的天气信息连带数据库的城市名称一并存入Redis中并设置过期时间为1小时,如果其他IP从数据库中查出来的都是这个地区那么就直接从缓存中返回结果,从而减少token的消耗。
- 对于同源IP,将IP对应的
- 在前文解析地址中提到了“
CompletableFuture
会在一个单独的线程中执行该操作,而主线程可以继续执行其他操作。”这里的“主线程”一词能否换成“父线程”一词来理解?- 很显然是不行的。“主线程”意为执行原始任务的线程,当我们说“主线程可以继续执行其他操作”,通常是指这个线程不会因为等待
InetAddress.getByName(ip)
的完成而被阻塞,而是可以继续执行后续的代码。而“父线程”和“子线程”的概念一般都是在描述线程层级关系中使用的,如:通过Thread
类创建新的线程时那么这个新的线程和创建它的线程属于“父子线程”关系。 CompletableFuture
创建的线程并不一定和主线程有“父子线程”关系,我们在上一期文章中已经解释过了:不传递线程池时CompletableFuture
会通过ForkJoinPool.commonPool()
来创建线程池执行任务,这个线程池中的线程和主线程是平等的。
- 很显然是不行的。“主线程”意为执行原始任务的线程,当我们说“主线程可以继续执行其他操作”,通常是指这个线程不会因为等待
参考资料
[1] 实时天气 for API | 和风天气开发服务 (qweather.com)
[2] 城市搜索 for API | 和风天气开发服务 (qweather.com)
[3] Reactive Core :: Spring Framework
[4] HTTP Caching :: Spring Framework