點選關註公眾號,Java幹貨 及時送達 👇
請求合並到底有什麽意義呢?我們來看下圖。
假設我們3個使用者(使用者id分別是1、2、3),現在他們都要查詢自己的基本資訊,請求到伺服器,伺服器端請求資料庫,發出3次請求。我們都知道資料庫連線資源是相當寶貴的,那麽我們怎麽盡可能節省連線資源呢?
這裏把資料庫換成被呼叫的遠端服務,也是同樣的道理。
我們改變下思路,如下圖所示。
我們在伺服器端把請求合並,只發出一條SQL查詢資料庫,資料庫返回後,伺服器端處理返回數據,根據一個唯一請求ID,把數據分組,返回給對應使用者。
技術手段
LinkedBlockQueue
阻塞佇列
ScheduledThreadPoolExecutor
定時任務執行緒池
CompleteableFuture future
阻塞機制(Java 8 的 CompletableFuture 並沒有 timeout 機制,後面最佳化,使用了佇列替代)
程式碼實作
查詢使用者的程式碼
public interface UserService {
Map<String, Users> queryUserByIdBatch(List<UserWrapBatchService.Request> userReqs);
}
@Service
public class UserServiceImpl implements UserService {
@Resource
private UsersMapper usersMapper;
@Override
public Map<String, Users> queryUserByIdBatch(List<UserWrapBatchService.Request> userReqs) {
// 全部參數
List<Long> userIds = userReqs.stream().map(UserWrapBatchService.Request::getUserId).collect(Collectors.toList());
QueryWrapper<Users> queryWrapper = new QueryWrapper<>();
// 用in語句合並成一條SQL,避免多次請求資料庫的IO
queryWrapper.in("id", userIds);
List<Users> users = usersMapper.selectList(queryWrapper);
Map<Long, List<Users>> userGroup = users.stream().collect(Collectors.groupingBy(Users::getId));
HashMap<String, Users> result = new HashMap<>();
userReqs.forEach(val -> {
List<Users> usersList = userGroup.get(val.getUserId());
if (!CollectionUtils.isEmpty(usersList)) {
result.put(val.getRequestId(), usersList.get(0));
} else {
// 表示沒數據
result.put(val.getRequestId(), null);
}
});
return result;
}
}
合並請求的實作
package com.springboot.sample.service.impl;
import com.springboot.sample.bean.Users;
import com.springboot.sample.service.UserService;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.*;
/***
* zzq
* 包裝成批次執行的地方
* */
@Service
public class UserWrapBatchService {
@Resource
private UserService userService;
/**
* 最大任務數
**/
public static int MAX_TASK_NUM = 100;
/**
* 請求類,code為查詢的共同特征,例如查詢商品,透過不同id的來區分
* CompletableFuture將處理結果返回
*/
public class Request {
// 請求id 唯一
String requestId;
// 參數
Long userId;
//TODO Java 8 的 CompletableFuture 並沒有 timeout 機制
CompletableFuture<Users> completableFuture;
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public CompletableFuture getCompletableFuture() {
return completableFuture;
}
public void setCompletableFuture(CompletableFuture completableFuture) {
this.completableFuture = completableFuture;
}
}
/*
LinkedBlockingQueue是一個阻塞的佇列,內部采用連結串列的結果,透過兩個ReenTrantLock來保證執行緒安全
LinkedBlockingQueue與ArrayBlockingQueue的區別
ArrayBlockingQueue預設指定了長度,而LinkedBlockingQueue的預設長度是Integer.MAX_VALUE,也就是無界佇列,在移除的速度小於添加的速度時,容易造成OOM。
ArrayBlockingQueue的儲存容器是陣列,而LinkedBlockingQueue是儲存容器是連結串列
兩者的實作佇列添加或移除的鎖不一樣,ArrayBlockingQueue實作的佇列中的鎖是沒有分離的,即添加操作和移除操作采用的同一個ReenterLock鎖,
而LinkedBlockingQueue實作的佇列中的鎖是分離的,其添加采用的是putLock,移除采用的則是takeLock,這樣能大大提高佇列的吞吐量,
也意味著在高並行的情況下生產者和消費者可以並列地操作佇列中的數據,以此來提高整個佇列的並行效能。
*/
private final Queue<Request> queue = new LinkedBlockingQueue();
@PostConstruct
public void init() {
//定時任務執行緒池,建立一個支持定時、周期性或延時任務的限定執行緒數目(這裏傳入的是1)的執行緒池
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(() -> {
int size = queue.size();
//如果佇列沒數據,表示這段時間沒有請求,直接返回
if (size == 0) {
return;
}
List<Request> list = new ArrayList<>();
System.out.println("合並了 [" + size + "] 個請求");
//將佇列的請求消費到一個集合保存
for (int i = 0; i < size; i++) {
// 後面的SQL語句是有長度限制的,所以還要做限制每次批次的數量,超過最大任務數,等下次執行
if (i < MAX_TASK_NUM) {
list.add(queue.poll());
}
}
//拿到我們需要去資料庫查詢的特征,保存為集合
List<Request> userReqs = new ArrayList<>();
for (Request request : list) {
userReqs.add(request);
}
//將參數傳入service處理, 這裏是本地服務,也可以把userService 看成RPC之類的遠端呼叫
Map<String, Users> response = userService.queryUserByIdBatch(userReqs);
//將處理結果返回各自的請求
for (Request request : list) {
Users result = response.get(request.requestId);
request.completableFuture.complete(result); //completableFuture.complete方法完成賦值,這一步執行完畢,下面future.get()阻塞的請求可以繼續執行了
}
}, 100, 10, TimeUnit.MILLISECONDS);
//scheduleAtFixedRate是周期性執行 schedule是延遲執行 initialDelay是初始延遲 period是周期間隔 後面是單位
//這裏我寫的是 初始化後100毫秒後執行,周期性執行10毫秒執行一次
}
public Users queryUser(Long userId) {
Request request = new Request();
// 這裏用UUID做請求id
request.requestId = UUID.randomUUID().toString().replace("-", "");
request.userId = userId;
CompletableFuture<Users> future = new CompletableFuture<>();
request.completableFuture = future;
//將物件傳入佇列
queue.offer(request);
//如果這時候沒完成賦值,那麽就會阻塞,直到能夠拿到值
try {
return future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return null;
}
}
控制層呼叫
/***
* 請求合並
* */
@RequestMapping("/merge")
public Callable<Users> merge(Long userId) {
return new Callable<Users>() {
@Override
public Users call() throws Exception {
return userBatchService.queryUser(userId);
}
};
}
模擬高並行查詢的程式碼
package com.springboot.sample;
import org.springframework.web.client.RestTemplate;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class TestBatch {
private static int threadCount = 30;
private final static CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(threadCount); //為保證30個執行緒同時並行執行
private static final RestTemplate restTemplate = new RestTemplate();
public static void main(String[] args) {
for (int i = 0; i < threadCount; i++) {//迴圈開30個執行緒
new Thread(new Runnable() {
public void run() {
COUNT_DOWN_LATCH.countDown();//每次減一
try {
COUNT_DOWN_LATCH.await(); //此處等待狀態,為了讓30個執行緒同時進行
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int j = 1; j <= 3; j++) {
int param = new Random().nextInt(4);
if (param <=0){
param++;
}
String responseBody = restTemplate.getForObject("http://localhost:8080/asyncAndMerge/merge?userId=" + param, String. class);
System.out.println(Thread.currentThread().getName() + "參數 " + param + " 返回值 " + responseBody);
}
}
}).start();
}
}
}
測試效果
要註意的問題
Java 8 的 CompletableFuture 並沒有 timeout 機制
後面的SQL語句是有長度限制的,所以還要做限制每次批次的數量,超過最大任務數,等下次執行(本例中加了MAX_TASK_NUM判斷)
使用佇列的超時解決Java 8 的 CompletableFuture 並沒有 timeout 機制
核心程式碼
package com.springboot.sample.service.impl;
import com.springboot.sample.bean.Users;
import com.springboot.sample.service.UserService;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.*;
/***
* zzq
* 包裝成批次執行的地方,使用queue解決超時問題
* */
@Service
public class UserWrapBatchQueueService {
@Resource
private UserService userService;
/**
* 最大任務數
**/
public static int MAX_TASK_NUM = 100;
/**
* 請求類,code為查詢的共同特征,例如查詢商品,透過不同id的來區分
* CompletableFuture將處理結果返回
*/
public class Request {
// 請求id
String requestId;
// 參數
Long userId;
// 佇列,這個有超時機制
LinkedBlockingQueue<Users> usersQueue;
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public LinkedBlockingQueue<Users> getUsersQueue() {
return usersQueue;
}
public void setUsersQueue(LinkedBlockingQueue<Users> usersQueue) {
this.usersQueue = usersQueue;
}
}
/*
LinkedBlockingQueue是一個阻塞的佇列,內部采用連結串列的結果,透過兩個ReenTrantLock來保證執行緒安全
LinkedBlockingQueue與ArrayBlockingQueue的區別
ArrayBlockingQueue預設指定了長度,而LinkedBlockingQueue的預設長度是Integer.MAX_VALUE,也就是無界佇列,在移除的速度小於添加的速度時,容易造成OOM。
ArrayBlockingQueue的儲存容器是陣列,而LinkedBlockingQueue是儲存容器是連結串列
兩者的實作佇列添加或移除的鎖不一樣,ArrayBlockingQueue實作的佇列中的鎖是沒有分離的,即添加操作和移除操作采用的同一個ReenterLock鎖,
而LinkedBlockingQueue實作的佇列中的鎖是分離的,其添加采用的是putLock,移除采用的則是takeLock,這樣能大大提高佇列的吞吐量,
也意味著在高並行的情況下生產者和消費者可以並列地操作佇列中的數據,以此來提高整個佇列的並行效能。
*/
private final Queue<Request> queue = new LinkedBlockingQueue();
@PostConstruct
public void init() {
//定時任務執行緒池,建立一個支持定時、周期性或延時任務的限定執行緒數目(這裏傳入的是1)的執行緒池
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(() -> {
int size = queue.size();
//如果佇列沒數據,表示這段時間沒有請求,直接返回
if (size == 0) {
return;
}
List<Request> list = new ArrayList<>();
System.out.println("合並了 [" + size + "] 個請求");
//將佇列的請求消費到一個集合保存
for (int i = 0; i < size; i++) {
// 後面的SQL語句是有長度限制的,所以還要做限制每次批次的數量,超過最大任務數,等下次執行
if (i < MAX_TASK_NUM) {
list.add(queue.poll());
}
}
//拿到我們需要去資料庫查詢的特征,保存為集合
List<Request> userReqs = new ArrayList<>();
for (Request request : list) {
userReqs.add(request);
}
//將參數傳入service處理, 這裏是本地服務,也可以把userService 看成RPC之類的遠端呼叫
Map<String, Users> response = userService.queryUserByIdBatchQueue(userReqs);
for (Request userReq : userReqs) {
// 這裏再把結果放到佇列裏
Users users = response.get(userReq.getRequestId());
userReq.usersQueue.offer(users);
}
}, 100, 10, TimeUnit.MILLISECONDS);
//scheduleAtFixedRate是周期性執行 schedule是延遲執行 initialDelay是初始延遲 period是周期間隔 後面是單位
//這裏我寫的是 初始化後100毫秒後執行,周期性執行10毫秒執行一次
}
public Users queryUser(Long userId) {
Request request = new Request();
// 這裏用UUID做請求id
request.requestId = UUID.randomUUID().toString().replace("-", "");
request.userId = userId;
LinkedBlockingQueue<Users> usersQueue = new LinkedBlockingQueue<>();
request.usersQueue = usersQueue;
//將物件傳入佇列
queue.offer(request);
//取出元素時,如果佇列為空,給定阻塞多少毫秒再佇列取值,這裏是3秒
try {
return usersQueue.poll(3000,TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
}
...省略..
@Override
public Map<String, Users> queryUserByIdBatchQueue(List<UserWrapBatchQueueService.Request> userReqs) {
// 全部參數
List<Long> userIds = userReqs.stream().map(UserWrapBatchQueueService.Request::getUserId).collect(Collectors.toList());
QueryWrapper<Users> queryWrapper = new QueryWrapper<>();
// 用in語句合並成一條SQL,避免多次請求資料庫的IO
queryWrapper.in("id", userIds);
List<Users> users = usersMapper.selectList(queryWrapper);
Map<Long, List<Users>> userGroup = users.stream().collect(Collectors.groupingBy(Users::getId));
HashMap<String, Users> result = new HashMap<>();
// 數據分組
userReqs.forEach(val -> {
List<Users> usersList = userGroup.get(val.getUserId());
if (!CollectionUtils.isEmpty(usersList)) {
result.put(val.getRequestId(), usersList.get(0));
} else {
// 表示沒數據 , 這裏要new,不然加入佇列會空指標
result.put(val.getRequestId(), new Users());
}
});
return result;
}
...省略...
小結
請求合並,批次的辦法能大幅節省被呼叫系統的連線資源,本例是以資料庫為例,其他RPC呼叫也是類似的道理。缺點就是請求的時間在執行實際的邏輯之前增加了等待時間,不適合低並行的場景。
源 碼: https://gitee.com /apple_1030907690/spring-boot-kubernetes/tree/v1.0.5END
看完本文有收獲?請轉發分享給更多人
關註「Java編程鴨」,提升Java技能
關註Java編程鴨微信公眾號,後台回復:碼農大禮包可以獲取最新整理的技術資料一份。涵蓋Java 框架學習、架構師學習等!
文章有幫助的話,在看,轉發吧。
謝謝支持喲 (*^__^*)