在物流和航空領域,即時數據處理、數據視覺化和綜合看板至關重要。它們能夠提升供應鏈效能、最佳化運輸並提高效率。特別是對商務航空來說,實分時析可以提高效率、永續性和客戶關註度,從而改善決策並節約成本。它還幫助確保安全執行、最佳化空域使用並能協助空中交通管制員。總體而言,即時數據處理可以幫助航空業提升客戶體驗並改善業務營運。
在本文中,我們使用 RisingWave、Upstash 和 Metabase 建立了一個即時航班跟蹤系統。我們利用 Aviationstack API 即時獲取航班數據,然後將這些數據傳輸到 Upstash 的一個 Kafka 主題中。隨後,我們將這些數據流攝取到 RisingWave 中,繼而建立物化檢視 (MV) 以進行深入的航班數據分析。物化檢視隨時提供最新數據,並可以即時查詢。我們還使用 Metabase 建立圖表、表格和綜合的數據看板,用於即時航班跟蹤。
使用 RisingWave、Upstash 和 Metabase 實作的即時航班跟蹤
1 在 Upstash 中設定 Kafka
Upstash 是一個無伺服器平台,提供 Redis、Kafka 和 Qstash 服務,具有可延伸性、高級安全選項和專門支持的優勢。Upstash Kafka 使用 Apache Kafka 進行部署,並提供一個無伺服器的 Kafka 平台,配備連結器、模式登錄檔和監控,為有高級需求的客戶提供各種計劃。
註冊 Upstash 帳戶
註冊免費的 Upstash 雲帳戶,以存取 Kafka 服務。要建立帳戶,請存取 Upstash Cloud Account [1] 。
建立 Kafka 集群
登入後,使用以下資訊建立 Kafka 集群:
Name : 給您的 Kafka 集群一個唯一名稱以便辨識。
Region : 選擇您的 Kafka 集群托管的區域。
Type : 選擇適合您需求的集群型別。
設定 Kafka 主題
建立 Kafka 集群後,設定 Kafka 主題。Upstash Kafka 提供一些預設配置,包括分區數量和保留策略,簡化了設定過程。
建立 Kafka 集群和 Kafka 主題後,我們就可以利用 Upstash Kafka 和 RisingWave 的功能來構建流處理應用程式和管道。如果您還想了解更多有關以上流程的資訊,請參閱 Upstash Kafka 文件 [2] 。
我們攝取到 Upstash Kafka 主題的範例封包含來自航空 API 的即時數據,包括機場名稱、航班狀態、航班位置等資訊。
{
"flight_date": "2024-05-16",
"flight_status": "scheduled",
"departure_airport": "Auckland International",
"departure_timezone": "Pacific/Auckland",
"departure_iata": "AKL",
"departure_icao": "NZAA",
"departure_terminal": "D",
"departure_gate": "28",
"departure_delay": null,
"departure_scheduled": "2024-05-16T06:30:00+00:00",
"departure_estimated": "2024-05-16T06:30:00+00:00",
"departure_actual": null,
"departure_estimated_runway": null,
"departure_actual_runway": null,
"arrival_airport": "Wellington International",
"arrival_timezone": "Pacific/Auckland",
"arrival_iata": "WLG",
"arrival_icao": "NZWN",
"arrival_terminal": null,
"arrival_gate": "15",
"arrival_baggage": null,
"arrival_delay": null,
"arrival_scheduled": "2024-05-16T07:40:00+00:00",
"arrival_estimated": "2024-05-16T07:40:00+00:00",
"arrival_actual": null,
"arrival_estimated_runway": null,
"arrival_actual_runway": null,
"airline_name": "Singapore Airlines",
"airline_iata": "SQ",
"airline_icao": "SIA",
"flight_number": "SQ4438",
"flight_iata": "SQ4438",
"flight_icao": "SIA4438",
"codeshared_airline_name": "air new zealand",
"codeshared_airline_iata": "nz",
"codeshared_airline_icao": "anz",
"codeshared_flight_number": "401",
"codeshared_flight_iata": "nz401",
"flight_info": "Singapore Airlines flight SQ4438 is currently in the air, flying from Auckland International (AKL) to Wellington International (WLG)"
}
2 將數據從 Upstash Kafka 攝取到 RisingWave
對於攝取和處理流數據,我們有兩種可用方法:開源的 RisingWave 和托管服務 RisingWave Cloud。在本文中,我們將重點使用 RisingWave Cloud,它的使用者體驗更加便捷友好,簡化了管理和利用 RisingWave 進行航班跟蹤解決方案的操作。
建立 RisingWave 集群
要在 RisingWave Cloud [3] 中建立 RisingWave 集群並探索各種功能,您可以註冊 Free Plan 來免費測試 RisingWave 的功能。有關如何建立 RisingWave 集群並開始使用的詳細說明,請參閱 官方 RisingWave 文件 [4] 。它將為您提供設定和探索 RisingWave 功能的分步指導。如果您需要額外的幫助,請加入我們的 Slack 社群 [5] 。
將數據流攝取到 RisingWave
現在我們已經在 Upstash 中(以 JSON 格式)設定了 Kafka 數據流,我們可以使用以下 SQL 語句連線到這些數據流。有關更多資訊,請參閱 在 RisingWave 中從 Upstash Kafka 攝取數據 [6] 。
CREATESOURCE flight_tracking_source(
flight_date VARCHAR,
flight_status VARCHAR,
departure_airport VARCHAR,
departure_timezone VARCHAR,
departure_iata VARCHAR,
departure_icao VARCHAR,
departure_terminal VARCHAR,
departure_gate VARCHAR,
departure_delay INTERVAL,
departure_scheduled TIMESTAMPWITHTIME ZONE,
departure_estimated TIMESTAMPWITHTIME ZONE,
departure_actual TIMESTAMPWITHTIME ZONE,
departure_estimated_runway TIMESTAMPWITHTIME ZONE,
departure_actual_runway TIMESTAMPWITHTIME ZONE,
arrival_airport VARCHAR,
arrival_timezone VARCHAR,
arrival_iata VARCHAR,
arrival_icao VARCHAR,
arrival_terminal VARCHAR,
arrival_gate VARCHAR,
arrival_baggage VARCHAR,
arrival_delay INTERVAL,
arrival_scheduled TIMESTAMPWITHTIME ZONE,
arrival_estimated TIMESTAMPWITHTIME ZONE,
arrival_actual TIMESTAMPWITHTIME ZONE,
arrival_estimated_runway TIMESTAMPWITHTIME ZONE,
arrival_actual_runway TIMESTAMPWITHTIME ZONE,
airline_name VARCHAR,
airline_iata VARCHAR,
airline_icao VARCHAR,
flight_number VARCHAR,
flight_iata VARCHAR,
flight_icao VARCHAR,
codeshared_airline_name VARCHAR,
codeshared_airline_i
ata VARCHAR,
codeshared_airline_icao VARCHAR,
codeshared_flight_number VARCHAR,
codeshared_flight_iata VARCHAR,
flight_info VARCHAR
)
WITH(
connector='kafka',
topic ='flights_tracking',
properties.bootstrap.server ='delicate-herring-9260-us1-kafka.upstash.io:9092',
properties.sasl.mechanism = 'SCRAM-SHA-256',
properties.security.protocol = 'SASL_SSL',
properties.sasl.username = 'xxxxxx',
properties.sasl.password = 'xxxxxx',
scan.startup.mode ='earliest'
)FORMAT PLAIN ENCODEJSON;
透過
CREATE SOURCE
語句,RisingWave 已連線到數據流,但尚未開始消費數據。為了增量處理和儲存數據,我們需要建立物化檢視。建立物化檢視後,RisingWave 將從指定的偏移量開始消費數據。
設定物化檢視以分析航班數據
我們將建立不同的物化檢視,這些檢視跟蹤並提取與
flight_tracking_source
相關的各種航班資訊。這些資訊包括航班日期、狀態、出發和到達詳細資訊(機場、時區、IATA 碼、ICAO 碼、計劃和估計時間)、航空公司資訊(名稱、IATA 碼、ICAO 碼)、航班號和識別元(IATA 和 ICAO 碼)以及一般航班資訊。
使用物化檢視是因為它們始終提供最新數據。例如,以下查詢建立了一個名為
Airline_Flight_Counts
的物化檢視,該檢視按小時間隔計算每個航空公司的航班數量。它使用了上文建立的
flight_tracking_source
,並按航空公司名稱和一小時的時間視窗對數據進行分組。
CREATEMATERIALIZEDVIEW Airline_Flight_Counts
SELECT airline_name,
COUNT(airline_name) AS total_flights,
window_start, window_end
FROM TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL'1 hour')
GROUPBY airline_name,window_start, window_end
ORDERBY total_flights desc;
以下查詢建立了一個名為
Airport_Summary
的物化檢視,該檢視按小時間隔計算每個機場的到達和出發航班總數。計算結果按機場和一小時的時間視窗進行分組,並按航班總數降序排列。
CREATEMATERIALIZEDVIEW Airport_Summary
WITH ArrivalCounts AS (
SELECT
arrival_airport,
COUNT(arrival_airport) AS total_flights_arrival,
window_start,
window_end
FROM
TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL'1 hour')
GROUPBY
arrival_airport,
window_start,
window_end
),
DepartureCounts AS (
SELECT
departure_airport,
COUNT(departure_airport) AS total_flights_departure,
window_start,
window_end
FROM
TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL'1 hour')
GROUPBY
departure_airport,
window_start,
window_end
)
SELECT
ArrivalCounts.arrival_airport,
ArrivalCounts.total_flights_arrival,
DepartureCounts.departure_airport,
DepartureCounts.total_flights_departure,
ArrivalCounts.window_start,
ArrivalCounts.window_end
FROM
ArrivalCounts
INNERJOIN
DepartureCounts ON ArrivalCounts.window_start = DepartureCounts.window_start
AND ArrivalCounts.window_end = DepartureCounts.window_end
AND ArrivalCounts.arrival_airport = DepartureCounts.departure_airport
ORDERBY
ArrivalCounts.total_flights_arrival DESC,
DepartureCounts.total_flights_departure DESC;
以下查詢建立了一個名為
Timezone_Summary
的物化檢視,該檢視按小時間隔計算每個時區的到達和出發航班總數。它使用資料來源
flight_tracking_source
,按時區和一小時的時間視窗對數據進行分組,然後按航班總數降序排列結果。
CREATEMATERIALIZEDVIEW Timezone_Summary
WITH ArrivalCounts AS (
SELECT
arrival_timezone,
COUNT(arrival_timezone) AS total_flights_arrival,
window_start,
window_end
FROM
TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL'1 hour')
GROUPBY
arrival_timezone,
window_start,
window_end
),
DepartureCounts AS (
SELECT
departure_timezone,
COUNT(departure_timezone) AS total_flights_departure,
window_start,
window_end
FROM
TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL'1 hour')
GROUPBY
departure_timezone,
window_start,
window_end
)
SELECT
ArrivalCounts.arrival_timezone,
ArrivalCounts.total_flights_arrival,
DepartureCounts.departure_timezone,
DepartureCounts.total_flights_departure,
ArrivalCounts.window_start,
ArrivalCounts.window_end
FROM
ArrivalCounts
INNERJOIN
DepartureCounts ON ArrivalCounts.window_start = DepartureCounts.window_start
AND ArrivalCounts.window_end = DepartureCounts.window_end
AND ArrivalCounts.arrival_timezone = DepartureCounts.departure_timezone
ORDERBY
ArrivalCounts.total_flights_arrival DESC,
DepartureCounts.total_flights_departure DESC;
3 使用 Metabase 視覺化
Metabase 是一個開源的商業智慧工具,可以將數據視覺化並共享數據見解。它讓您能用簡單的方法,基於資料庫數據,建立各種圖表、看板和指標。
將 RisingWave 連線到 Metabase
由於 RisingWave 相容 PostgreSQL,您可以將 Metabase 連線到 RisingWave 作為資料來源,並在流數據上構建分析。
您可以在 Metabase 中使用 RisingWave 作為資料來源,使用 RisingWave 中的表和物化檢視建立視覺化圖表和綜合看板。要了解具體步驟,請參閱 配置 Metabase 以讀取 RisingWave 數據 [7] 。
成功將 RisingWave 連線到 Metabase 後,我們將 RisingWave 中的物化檢視作為資料來源添加,以建立表格、各種圖表和綜合看板。
使用 Metabase 視覺化數據
我們使用在 RisingWave 中的物化檢視和源(如
flight_tracking_source
、
Airline_Flight_Counts
、
Airport_Summary
和
Timezone_Summary
)建立這些表格、圖表和綜合看板。
以下是一個綜合看板,展示了一系列用於即時航班跟蹤的圖表。它提供了航班操作的整體檢視,提供按航空公司、機場和時區分類的航班總數的洞察。此外,它還提供當前航班的詳細資訊,讓使用者得以全面監控並提出見解。
4 總結
在本文中,我們使用 Upstash、RisingWave 和 Metabase 開發了一個即時航班跟蹤系統。因為 RisingWave 提供了廣泛的源和目標連結器,配置和連線變得非常簡單。我們將即時航班數據攝取到 Upstash 的 Kafka 主題中,然後將其發送到 RisingWave,並建立物化檢視以進行深入分析。最後,我們使用 Metabase 建立了視覺化圖表和即時看板,使使用者能夠監控航班營運並做出明智的決策。
參考資料
[1]
Upstash Cloud Account:
https://console.upstash.com/kafka
Upstash Kafka 文件:
https://upstash.com/docs/kafka
RisingWave Cloud:
https://cloud.risingwave.com/
官方 RisingWave 文件:
https://docs.risingwave.com/docs/current/intro/
Slack 社群:
https://www.risingwave.com/slack
在 RisingWave 中從 Upstash Kafka 攝取數據:
https://docs.risingwave.com/docs/current/ingest-from-upstash-kafka/
配置 Metabase 以讀取 RisingWave 數據:
https://docs.risingwave.com/docs/current/metabase-integration/
關於 RisingW
ave
RisingWave 是一款基於 Apache 2.0 協定開源的分布式流資料庫,致力於為使用者提供極致簡單、高效的流數據處理與管理能力。RisingWave 采用存算分離架構,實作了高效的復雜查詢、瞬時動態擴縮容以及快速故障恢復,並助力使用者極大地簡化流計算架構,輕松搭建穩定且高效的流計算套用。
RisingWave 始終聆聽來自社群的聲音,並積極回應使用者的反饋。目前,RisingWave 已匯聚了 150+ 名開源貢獻者和 3000+ 名社群成員。全球範圍內,已有上百個 RisingWave 集群在生產環境中部署。
往期推薦
技術內幕
使用者案例