👉 歡迎 ,你將獲得: 專屬的計畫實戰 / Java 學習路線 / 一對一提問 / 學習打卡 / 贈書福利
全棧前後端分離部落格計畫 2.0 版本完結啦, 演示連結 : http://116.62.199.48/ , 新計畫正在醞釀中 。全程手摸手,後端 + 前端全棧開發,從 0 到 1 講解每個功能點開發步驟,1v1 答疑,直到計畫上線。 目前已更新了239小節,累計38w+字,講解圖:1645張,還在持續爆肝中.. 後續還會上新更多計畫,目標是將Java領域典型的計畫都整一波,如秒殺系統, 線上商城, IM即時通訊,Spring Cloud Alibaba 等等,
介紹
下載功能應該是比較常見的功能了,雖然一個計畫裏面可能出現的不多,但是基本上每個計畫都會有,而且有些下載功能其實還是比較繁雜的,倒不是難,而是麻煩。
所以結合之前的下載需求,我寫了一個庫來簡化下載功能的實作
❝
傳送門:https://github.com/Linyuzai/concept/wiki/Concept-Download
❞
如果我說現在只需要一個註解就能幫你下載任意的物件,是不是覺得非常的方便
@Download(source = " classpath:/download/README.txt")
@GetMapping("/ classpath")
public void classpath() {
}
@Download
@GetMapping("/file")
public File file() {
return new File("/Users/Shared/README.txt");
}
@Download
@GetMapping("/http")
public String http() {
return"http://127.0.0.1:8080/concept-download/image.jpg";
}
感覺差別不大?那就聽聽我遇到的一個下載需求
我們有一個平台是管理裝置的,然後每個裝置都會有一個二維碼圖片,用一個欄位儲存的 http 地址
現在需要匯出所有裝置二維碼圖片的壓縮包,圖片名稱需要用裝置名稱加 .png 字尾,需求上來說並不難,但是著實有點麻煩
首先我需要將裝置列表查出來
然後使用二維碼地址下載圖片並寫到本地緩存檔
在下載之前需要先判斷是否已經存在緩存
下載時需要並行下載提升效能
等所有圖片下載結束後
再生成一個壓縮檔
然後再操作輸入輸出流寫到響應中
看著我實作了將近 200 行的程式碼,真是又臭又長,一個下載功能咋能那麽麻煩呢,於是我就想有沒有更簡單的方式
我當時的需求很簡單,我想著我只要提供需要下載的數據,比如一個檔路徑,一個檔物件,一段字串文本,一個http地址,或者混搭了前面所有型別的一個集合,甚至是我們自訂的某個類的例項,後面的事情我就不用管了
檔路徑是一個檔還是一個目錄?字串文本需要先寫入一個文字檔案中?http資源如何下載到本地?多個檔怎麽壓縮?最後怎麽寫到響應中?我才不想花時間管這些
比如就像我現在這個需求,我只要返回裝置列表就行了,其他的事情我都不用管
@Download(filename = "二維碼.zip")
@GetMapping("/download")
public List<Device> download() {
return deviceService.all();
}
public class Device {
//裝置名稱
private String name;
//裝置二維碼
//註解表示該http地址是需要下載的數據
@SourceObject
private String qrCodeUrl;
//註解表示檔名稱
@SourceName
public String getQrCodeName() {
return name + ".png";
}
//省略其他內容方法
}
透過在 Device 的欄位上標註某些註解(或是實作某個介面)來指定檔名稱和檔地址
如果能這樣實作,省時省心省力,又多了寫 199 行程式碼的摸魚時間難道不香麽
思路
下面來講講這個庫的主要設計思路,以及中間遇到的坑,大家有興趣可以繼續往下看
其實基於一開始的設想,我覺得功能並沒有多復雜,於是就決定開肝
只是萬萬沒想到實作起來比我想象的更復雜(這是後話了)
基礎
首先整個庫基於響應式編程,但卻並不是完全意義上的響應式,只能說是
Mono<InputStream
>這樣的。。。奇怪組合?
為什麽會這樣呢,很大的一個原因是由於需要相容webmvc和webflux,導致我僅僅是將之前實作的InputStream方式重構成了響應式,所以就出現了這樣的組合
這也是我遇到的最大的一個坑,我先前已經基本調通了基於Servlet的整個下載流程,然後就想著支持一下webflux
大家都知道webmvc中,我們可以透過RequestContextHolder來獲得請求和響應物件,但是在webflux中就不行了,當然我們可以在方法參數中註入
@Download(source = " classpath:/download/README.txt")
@GetMapping("/ classpath")
public void classpath(ServerHttpResponse response) {
}
結合Spring內建的註入功能,我們就可以透過AOP拿到響應的入參了,但是總覺得這樣寫有點多余,強迫癥表示不能忍
有什麽辦法既能把用不到的入參幹掉,又能拿到響應物件呢,在網上找到了一種實作方式
/**
* 用於設定當前的請求和響應。
*
* @see ReactiveDownloadHolder
*/
public class ReactiveDownloadFilter implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
ServerHttpResponse response = exchange.getResponse();
return chain.filter(exchange)
//低版本使用subscriberContext
.contextWrite(ctx -> ctx.put(ServerHttpRequest. class, request))
.contextWrite(ctx -> ctx.put(ServerHttpResponse. class, response));
}
}
/**
* 用於獲得當前的請求和響應。
*
* @see ReactiveDownloadFilter
*/
public class ReactiveDownloadHolder {
public static Mono<ServerHttpRequest> getRequest() {
//低版本使用subscriberContext
return Mono.deferContextual(contextView -> Mono.just(contextView.get(ServerHttpRequest. class)));
}
public static Mono<ServerHttpResponse> getResponse() {
//低版本使用subscriberContext
return Mono.deferContextual(contextView -> Mono.just(contextView.get(ServerHttpResponse. class)));
}
}
透過添加WebFilter就可以獲得響應物件了,但是返回值是
Mono<ServerHttpResponse>
那麽可不可以透過Mono.block()阻塞得到對應的物件呢,答案是不行,由於webflux基於Netty的非阻塞執行緒,如果呼叫該方法會直接丟擲異常
所以就沒有任何辦法了,只能將之前程式碼基於響應式重構
架構
接下來說說整體架構
對於一個下載請求,我們可以分成幾個步驟,以下載多個檔的壓縮包為例
首先我們一般是得到多個檔的路徑或對應的File物件
然後將這些檔壓縮生成一個壓縮檔
最後將壓縮檔寫入到響應中
但是對於我上面描述的需求,一開始就不是檔路徑或物件了,而是一個http地址,然後在壓縮之前還需要多一個步驟,需要先將圖片下載下來
那麽對於各種各樣的需求我們可能需要在當前步驟中的任意位置添加額外的步驟,所以我參考了Spring Cloud Gateway 攔截鏈的實作方式
/**
* 下載處理器。
*/
public interface DownloadHandler extends OrderProvider {
/**
* 執行處理。
*
* @param context {@link DownloadContext}
* @param chain {@link DownloadHandlerChain}
*/
Mono<Void> handle(DownloadContext context, DownloadHandlerChain chain);
}
/**
* 下載處理鏈。
*/
public interface DownloadHandlerChain {
/**
* 排程下一個下載處理器。
*
* @param context {@link DownloadContext}
*/
Mono<Void> next(DownloadContext context);
}
這樣每個步驟就可以單獨實作一個DownloadHandler,步驟與步驟之間可以任意的組合添加
下載上下文
在此基礎上使用一個貫穿整個流程的上下文DownloadContext,方便共享和傳遞步驟之間的中間結果
對於上下文DownloadContext也提供了DownloadContextFactory可以用於自訂上下文
同時提供了DownloadContextInitializer和DownloadContextDestroyer用於在上下文初始化和銷毀時擴充套件自己的邏輯
下載型別支持
我們需要下載的數據的型別是不固定的,比如有檔,有http地址,也會有之前我希望的自訂的類的例項
所以我將所有的下載物件抽象成了Source,表示一個下載源,這樣檔可以實作為FileSource,http地址可以實作為HttpSource,然後透過對應的SourceFactory來匹配建立
比如FileSourceFactory可以匹配File並且建立FileSource,HttpSourceFactory可以匹配http://字首並且建立HttpSource
/**
* {@link Source} 工廠。
*/
public interface SourceFactory extends OrderProvider {
/**
* 是否支持需要下載的原始數據物件。
*
* @param source 需要下載的原始數據物件
* @param context {@link DownloadContext}
* @return 如果支持則返回 true
*/
boolean support(Object source, DownloadContext context);
/**
* 建立。
*
* @param source 需要下載的原始數據物件
* @param context {@link DownloadContext}
* @return 建立的 {@link Source}
*/
Source create(Object source, DownloadContext context);
}
那麽對於我們自訂的類要怎麽支持呢,之前提到可以在類上標註註解或是實作特定的介面,那麽就用我實作的註解的方式來大概講一講吧
其實邏輯很簡單,只要能熟練的運用反射就完全沒問題,我們再來看一看用法
@Download(filename = "二維碼.zip")
@GetMapping("/download")
public List<Device> download() {
return deviceService.all();
}
public class Device {
//裝置名稱
private String name;
//裝置二維碼
//註解表示該http地址是需要下載的數據
@SourceObject
private String qrCodeUrl;
//註解表示檔名稱
@SourceName
public String getQrCodeName() {
return name + ".png";
}
//省略其他內容方法
}
首先我定義了一個註解@SourceModel標註在類上表示需要被解析,然後定義了一個@SourceObject註解標註在需要下載的欄位(或方法)上,這樣我們就可以透過反射拿到這個欄位(或方法)的值
基於當前支持的SourceFactory就能建立出對應的Source,接下來使用@SourceName指定名稱,也同樣可以透過反射獲得這個方法(或欄位)的值並依舊透過反射設定到建立出來的Source上
這樣就能非常靈活的支持任意的物件型別了
並行載入
對於像http這種網路資源,我們需要先並行載入(多個檔時)到原生的記憶體中或是緩存檔中來提升我們的處理效率
當然我可以直接定死一個執行緒池來執行,但是每個機器每個計畫甚至每個需求對於並行的要求和資源的分配都不一樣
所以我提供了SourceLoader來支持自訂的載入邏輯,你甚至可以一部份用執行緒池,一部份用協程,剩下一部份不載入
/**
* {@link Source} 載入器。
*
* @see DefaultSourceLoader
* @see SchedulerSourceLoader
*/
public interface SourceLoader {
/**
* 執行載入。
*
* @param source {@link Source}
* @param context {@link DownloadContext}
* @return 載入後的 {@link Source}
*/
Mono<Source> load(Source source, DownloadContext context);
}
壓縮
當我們載入完之後就可以執行壓縮了,同樣的我定義了一個類Compression作為壓縮物件的抽象
一般來說,我們會先在本地建立一個緩存檔,然後將壓縮後的數據寫入到緩存檔中
不過我每次都很討厭在配置檔中配置各種各樣的路徑,所以在壓縮時支持記憶體壓縮,當然如果檔比較大還是老老實實生成一個緩存檔
對於壓縮格式也提供了可以完全自訂的SourceCompressor介面,你想自己實作一個壓縮協定都沒有問題
/**
* {@link Source} 壓縮器。
*
* @see ZipSourceCompressor
*/
public interface SourceCompressor extends OrderProvider {
/**
* 獲得壓縮格式。
*
* @return 壓縮格式
*/
String getFormat();
/**
* 判斷是否支持對應的壓縮格式。
*
* @param format 壓縮格式
* @param context {@link DownloadContext}
* @return 如果支持則返回 true
*/
default boolean support(String format, DownloadContext context) {
return format.equalsIgnoreCase(getFormat());
}
/**
* 如果支持對應的格式就會呼叫該方法執行壓縮。
*
* @param source {@link Source}
* @param writer {@link DownloadWriter}
* @param context {@link DownloadContext}
* @return {@link Compression}
*/
Compression compress(Source source, DownloadWriter writer, DownloadContext context);
}
響應寫入
我將響應抽象成了DownloadResponse,主要用於相容HttpServletResponse和ServerHttpResponse
但是問題又出現了,下面是webmvc和webflux寫入響應的方式
//HttpServletResponse
response.getOutputStream().write(byte b[], int off, int len);
//ServerHttpResponse
response.writeWith(Publisher<? extends DataBuffer> body);
這相容的我腦殼疼,不過最後還是搞定了
/**
* 持有 {@link ServerHttpResponse} 的 {@link DownloadResponse},用於 webflux。
*/
@Getter
public class ReactiveDownloadResponse implements DownloadResponse {
private final ServerHttpResponse response;
private OutputStream os;
private Mono<Void> mono;
public ReactiveDownloadResponse(ServerHttpResponse response) {
this.response = response;
}
@Override
public Mono<Void> write(Consumer<OutputStream> consumer) {
if (os == null) {
mono = response.writeWith(Flux.create(fluxSink -> {
try {
os = new FluxSinkOutputStream(fluxSink, response);
consumer.accept(os);
} catch (Throwable e) {
fluxSink.error(e);
}
}));
} else {
consumer.accept(os);
}
return mono;
}
@SneakyThrows
@Override
public void flush() {
if (os != null) {
os.flush();
}
}
@AllArgsConstructor
public static class FluxSinkOutputStream extends OutputStream {
private FluxSink<DataBuffer> fluxSink;
private ServerHttpResponse response;
@Override
public void write(byte[] b) throws IOException {
writeSink(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
byte[] bytes = new byte[len];
System.arraycopy(b, off, bytes, 0, len);
writeSink(bytes);
}
@Override
public void write(int b) throws IOException {
writeSink((byte) b);
}
@Override
public void flush() {
fluxSink.complete();
}
public void writeSink(byte... bytes) {
DataBuffer buffer = response.bufferFactory().wrap(bytes);
fluxSink.next(buffer);
//在這裏可能有問題,但是目前沒有沒有需要釋放的數據
DataBufferUtils.release(buffer);
}
}
}
只要最後都是寫byte[]就可以相互轉化,只不過可能麻煩一點,需要用介面回呼
將FluxSink偽裝成一個OutputStream,寫入時把
byte[]
轉成DataBuffer 並呼叫next方法,最後在flush的時候呼叫complete方法就行了,完美
響應寫入其實就是對輸入輸出流的處理了,正常情況下,我們會定義一個
byte[]
用來緩存讀到的數據,所以我也不會固定這個緩存的大小而是提供了DownloadWriter可以自訂處理輸入輸出流,包括存在指定編碼或是Range頭的情況
/**
* 具體操作 {@link InputStream} 和 {@link OutputStream} 的寫入器。
*/
public interface DownloadWriter extends OrderProvider {
/**
* 該寫入器是否支持寫入。
*
* @param resource {@link Resource}
* @param range {@link Range}
* @param context {@link DownloadContext}
* @return 如果支持則返回 true
*/
boolean support(Resource resource, Range range, DownloadContext context);
/**
* 執行寫入。
*
* @param is {@link InputStream}
* @param os {@link OutputStream}
* @param range {@link Range}
* @param charset {@link Charset}
* @param length 總大小,可能為 null
*/
default void write(InputStream is, OutputStream os, Range range, Charset charset, Long length) {
write(is, os, range, charset, length, null);
}
/**
* 執行寫入。
*
* @param is {@link InputStream}
* @param os {@link OutputStream}
* @param range {@link Range}
* @param charset {@link Charset}
* @param length 總大小,可能為 null
* @param callback 回呼當前進度和增長的大小
*/
void write(InputStream is, OutputStream os, Range range, Charset charset, Long length, Callback callback);
/**
* 進度回呼。
*/
interface Callback {
/**
* 回呼進度。
*
* @param current 當前值
* @param increase 增長值
*/
void onWrite(long current, long increase);
}
}
事件
當我把整個下載流程實作之後發現其實整個邏輯還是有點復雜的,所有得想個辦法能監控整個下載流程
最開始我定義了幾個監聽器用來回呼,但是並不好用,首先我們整個架構設計的是十分靈活可延伸的,而定義的監聽器型別少而且不好擴充套件
當我們後續添加了其他的流程和步驟後,不得不新加幾類監聽器或是在原來的監聽器類上添加方法,十分麻煩
所以我想到使用事件的方式能更加靈活的擴充套件,並定義了DownloadEventPublisher用於釋出事件和DownloadEventListener用於監聽事件,而且支持了Spring的事件監聽方式
日誌
基於上述的事件方式,我在此基礎上實作了幾種下載日誌
每個流程對應的日誌
載入進度更新,壓縮排度更新,響應寫入進度更新的日誌
時間花費的日誌
這些日誌由於比較詳細的打印了整個下載流程的資訊,還幫我發現了好多Bug
其他坑
最開始上下文的初始化和銷毀各自對應了一個步驟分別位於最開始和最末尾,但是當我在webflux中寫完響應後,發現上下文的銷毀不會執行
於是我跟了下Spring的源碼發現寫入方法返回的是Mono.empty(),也就是說,當響應寫入後就不會往下呼叫next方法了,所以在響應寫入之後的步驟永遠都不會被呼叫
最後就把上下文初始化和銷毀單獨出來了,並且在doAfterTerminate時呼叫銷毀方法
結束
基本上的內容就是這樣了,不過對於響應式這塊的內容還是莫得不是很透,以及有部份操作符也不是很會用,但還是有了解到很多高級的用法
👉 歡迎 ,你將獲得: 專屬的計畫實戰 / Java 學習路線 / 一對一提問 / 學習打卡 / 贈書福利
全棧前後端分離部落格計畫 2.0 版本完結啦, 演示連結 : http://116.62.199.48/ , 新計畫正在醞釀中 。全程手摸手,後端 + 前端全棧開發,從 0 到 1 講解每個功能點開發步驟,1v1 答疑,直到計畫上線。 目前已更新了239小節,累計38w+字,講解圖:1645張,還在持續爆肝中.. 後續還會上新更多計畫,目標是將Java領域典型的計畫都整一波,如秒殺系統, 線上商城, IM即時通訊,Spring Cloud Alibaba 等等,
1.
2.
3.
4.
最近面試BAT,整理一份面試資料【Java面試BATJ通關手冊】,覆蓋了Java核心技術、JVM、Java並行、SSM、微服務、資料庫、數據結構等等。
獲取方式:點「在看」,關註公眾號並回復 Java 領取,更多內容陸續奉上。
PS:因公眾號平台更改了推播規則,如果不想錯過內容,記得讀完點一下「在看」,加個「星標」,這樣每次新文章推播才會第一時間出現在你的訂閱列表裏。
點「在看」支持小哈呀,謝謝啦