當前位置: 妍妍網 > 資訊

使用 RisingWave、Upstash 和 Metabase 進行即時航班跟蹤

2024-05-29資訊

在物流和航空領域,即時數據處理、數據視覺化和綜合看板至關重要。它們能夠提升供應鏈效能、最佳化運輸並提高效率。特別是對商務航空來說,實分時析可以提高效率、永續性和客戶關註度,從而改善決策並節約成本。它還幫助確保安全執行、最佳化空域使用並能協助空中交通管制員。總體而言,即時數據處理可以幫助航空業提升客戶體驗並改善業務營運。

在本文中,我們使用 RisingWave、Upstash 和 Metabase 建立了一個即時航班跟蹤系統。我們利用 Aviationstack API 即時獲取航班數據,然後將這些數據傳輸到 Upstash 的一個 Kafka 主題中。隨後,我們將這些數據流攝取到 RisingWave 中,繼而建立物化檢視 (MV) 以進行深入的航班數據分析。物化檢視隨時提供最新數據,並可以即時查詢。我們還使用 Metabase 建立圖表、表格和綜合的數據看板,用於即時航班跟蹤。

使用 RisingWave、Upstash 和 Metabase 實作的即時航班跟蹤

1 在 Upstash 中設定 Kafka

Upstash 是一個無伺服器平台,提供 Redis、Kafka 和 Qstash 服務,具有可延伸性、高級安全選項和專門支持的優勢。Upstash Kafka 使用 Apache Kafka 進行部署,並提供一個無伺服器的 Kafka 平台,配備連結器、模式登錄檔和監控,為有高級需求的客戶提供各種計劃。

註冊 Upstash 帳戶

註冊免費的 Upstash 雲帳戶,以存取 Kafka 服務。要建立帳戶,請存取 Upstash Cloud Account [1]

Upstash:帳戶註冊和登入流程

建立 Kafka 集群

登入後,使用以下資訊建立 Kafka 集群:

  • Name : 給您的 Kafka 集群一個唯一名稱以便辨識。

  • Region : 選擇您的 Kafka 集群托管的區域。

  • Type : 選擇適合您需求的集群型別。

  • Upstash:建立 Kafka 集群

    設定 Kafka 主題

    建立 Kafka 集群後,設定 Kafka 主題。Upstash Kafka 提供一些預設配置,包括分區數量和保留策略,簡化了設定過程。

    Upstash:建立 Kafka 主題

    建立 Kafka 集群和 Kafka 主題後,我們就可以利用 Upstash Kafka 和 RisingWave 的功能來構建流處理應用程式和管道。如果您還想了解更多有關以上流程的資訊,請參閱 Upstash Kafka 文件 [2]

    我們攝取到 Upstash Kafka 主題的範例封包含來自航空 API 的即時數據,包括機場名稱、航班狀態、航班位置等資訊。

    {
    "flight_date""2024-05-16",
    "flight_status""scheduled",
    "departure_airport""Auckland International",
    "departure_timezone""Pacific/Auckland",
    "departure_iata""AKL",
    "departure_icao""NZAA",
    "departure_terminal""D",
    "departure_gate""28",
    "departure_delay"null,
    "departure_scheduled""2024-05-16T06:30:00+00:00",
    "departure_estimated""2024-05-16T06:30:00+00:00",
    "departure_actual"null,
    "departure_estimated_runway"null,
    "departure_actual_runway"null,
    "arrival_airport""Wellington International",
    "arrival_timezone""Pacific/Auckland",
    "arrival_iata""WLG",
    "arrival_icao""NZWN",
    "arrival_terminal"null,
    "arrival_gate""15",
    "arrival_baggage"null,
    "arrival_delay"null,
    "arrival_scheduled""2024-05-16T07:40:00+00:00",
    "arrival_estimated""2024-05-16T07:40:00+00:00",
    "arrival_actual"null,
    "arrival_estimated_runway"null,
    "arrival_actual_runway"null,
    "airline_name""Singapore Airlines",
    "airline_iata""SQ",
    "airline_icao""SIA",
    "flight_number""SQ4438",
    "flight_iata""SQ4438",
    "flight_icao""SIA4438",
    "codeshared_airline_name""air new zealand",
    "codeshared_airline_iata""nz",
    "codeshared_airline_icao""anz",
    "codeshared_flight_number""401",
    "codeshared_flight_iata""nz401",
    "flight_info""Singapore Airlines flight SQ4438 is currently in the air, flying from Auckland International (AKL) to Wellington International (WLG)"
    }

    2 將數據從 Upstash Kafka 攝取到 RisingWave

    對於攝取和處理流數據,我們有兩種可用方法:開源的 RisingWave 和托管服務 RisingWave Cloud。在本文中,我們將重點使用 RisingWave Cloud,它的使用者體驗更加便捷友好,簡化了管理和利用 RisingWave 進行航班跟蹤解決方案的操作。

    建立 RisingWave 集群

    要在 RisingWave Cloud [3] 中建立 RisingWave 集群並探索各種功能,您可以註冊 Free Plan 來免費測試 RisingWave 的功能。有關如何建立 RisingWave 集群並開始使用的詳細說明,請參閱 官方 RisingWave 文件 [4] 。它將為您提供設定和探索 RisingWave 功能的分步指導。如果您需要額外的幫助,請加入我們的 Slack 社群 [5]

    RisingWave Cloud:帳戶註冊和登入流程

    將數據流攝取到 RisingWave

    現在我們已經在 Upstash 中(以 JSON 格式)設定了 Kafka 數據流,我們可以使用以下 SQL 語句連線到這些數據流。有關更多資訊,請參閱 在 RisingWave 中從 Upstash Kafka 攝取數據 [6]

    CREATESOURCE flight_tracking_source(
    flight_date VARCHAR,
    flight_status VARCHAR,
    departure_airport VARCHAR,
    departure_timezone VARCHAR,
    departure_iata VARCHAR,
    departure_icao VARCHAR,
    departure_terminal VARCHAR,
    departure_gate VARCHAR,
    departure_delay INTERVAL,
    departure_scheduled TIMESTAMPWITHTIME ZONE,
    departure_estimated TIMESTAMPWITHTIME ZONE,
    departure_actual TIMESTAMPWITHTIME ZONE,
    departure_estimated_runway TIMESTAMPWITHTIME ZONE,
    departure_actual_runway TIMESTAMPWITHTIME ZONE,
    arrival_airport VARCHAR,
    arrival_timezone VARCHAR,
    arrival_iata VARCHAR,
    arrival_icao VARCHAR,
    arrival_terminal VARCHAR,
    arrival_gate VARCHAR,
    arrival_baggage VARCHAR,
    arrival_delay INTERVAL,
    arrival_scheduled TIMESTAMPWITHTIME ZONE,
    arrival_estimated TIMESTAMPWITHTIME ZONE,
    arrival_actual TIMESTAMPWITHTIME ZONE,
    arrival_estimated_runway TIMESTAMPWITHTIME ZONE,
    arrival_actual_runway TIMESTAMPWITHTIME ZONE,
    airline_name VARCHAR,
    airline_iata VARCHAR,
    airline_icao VARCHAR,
    flight_number VARCHAR,
    flight_iata VARCHAR,
    flight_icao VARCHAR,
    codeshared_airline_name VARCHAR,
    codeshared_airline_i
    ata VARCHAR,
    codeshared_airline_icao VARCHAR,
    codeshared_flight_number VARCHAR,
    codeshared_flight_iata VARCHAR,
    flight_info VARCHAR
    )
    WITH(
    connector='kafka',
    topic ='flights_tracking',
    properties.bootstrap.server ='delicate-herring-9260-us1-kafka.upstash.io:9092',
    properties.sasl.mechanism = 'SCRAM-SHA-256',
    properties.security.protocol = 'SASL_SSL',
    properties.sasl.username = 'xxxxxx',
    properties.sasl.password = 'xxxxxx',
    scan.startup.mode ='earliest'
    )FORMAT PLAIN ENCODEJSON;




    透過 CREATE SOURCE 語句,RisingWave 已連線到數據流,但尚未開始消費數據。為了增量處理和儲存數據,我們需要建立物化檢視。建立物化檢視後,RisingWave 將從指定的偏移量開始消費數據。

    設定物化檢視以分析航班數據

    我們將建立不同的物化檢視,這些檢視跟蹤並提取與 flight_tracking_source 相關的各種航班資訊。這些資訊包括航班日期、狀態、出發和到達詳細資訊(機場、時區、IATA 碼、ICAO 碼、計劃和估計時間)、航空公司資訊(名稱、IATA 碼、ICAO 碼)、航班號和識別元(IATA 和 ICAO 碼)以及一般航班資訊。

    使用物化檢視是因為它們始終提供最新數據。例如,以下查詢建立了一個名為 Airline_Flight_Counts 的物化檢視,該檢視按小時間隔計算每個航空公司的航班數量。它使用了上文建立的 flight_tracking_source ,並按航空公司名稱和一小時的時間視窗對數據進行分組。

    CREATEMATERIALIZEDVIEW Airline_Flight_Counts
    SELECT airline_name,
    COUNT(airline_name) AS total_flights,
    window_start, window_end
    FROM TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL'1 hour')
    GROUPBY airline_name,window_start, window_end
    ORDERBY total_flights desc;

    以下查詢建立了一個名為 Airport_Summary 的物化檢視,該檢視按小時間隔計算每個機場的到達和出發航班總數。計算結果按機場和一小時的時間視窗進行分組,並按航班總數降序排列。

    CREATEMATERIALIZEDVIEW Airport_Summary
    WITH ArrivalCounts AS (
    SELECT
    arrival_airport,
    COUNT(arrival_airport) AS total_flights_arrival,
    window_start,
    window_end
    FROM
    TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL'1 hour')
    GROUPBY
    arrival_airport,
    window_start,
    window_end
    ),
    DepartureCounts AS (
    SELECT
    departure_airport,
    COUNT(departure_airport) AS total_flights_departure,
    window_start,
    window_end
    FROM
    TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL'1 hour')
    GROUPBY
    departure_airport,
    window_start,
    window_end
    )
    SELECT
    ArrivalCounts.arrival_airport,
    ArrivalCounts.total_flights_arrival,
    DepartureCounts.departure_airport,
    DepartureCounts.total_flights_departure,
    ArrivalCounts.window_start,
    ArrivalCounts.window_end
    FROM
    ArrivalCounts
    INNERJOIN
    DepartureCounts ON ArrivalCounts.window_start = DepartureCounts.window_start
    AND ArrivalCounts.window_end = DepartureCounts.window_end
    AND ArrivalCounts.arrival_airport = DepartureCounts.departure_airport
    ORDERBY
    ArrivalCounts.total_flights_arrival DESC,
    DepartureCounts.total_flights_departure DESC;

    以下查詢建立了一個名為 Timezone_Summary 的物化檢視,該檢視按小時間隔計算每個時區的到達和出發航班總數。它使用資料來源 flight_tracking_source ,按時區和一小時的時間視窗對數據進行分組,然後按航班總數降序排列結果。

    CREATEMATERIALIZEDVIEW Timezone_Summary
    WITH ArrivalCounts AS (
    SELECT
    arrival_timezone,
    COUNT(arrival_timezone) AS total_flights_arrival,
    window_start,
    window_end
    FROM
    TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL'1 hour')
    GROUPBY
    arrival_timezone,
    window_start,
    window_end
    ),
    DepartureCounts AS (
    SELECT
    departure_timezone,
    COUNT(departure_timezone) AS total_flights_departure,
    window_start,
    window_end
    FROM
    TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL'1 hour')
    GROUPBY
    departure_timezone,
    window_start,
    window_end
    )
    SELECT
    ArrivalCounts.arrival_timezone,
    ArrivalCounts.total_flights_arrival,
    DepartureCounts.departure_timezone,
    DepartureCounts.total_flights_departure,
    ArrivalCounts.window_start,
    ArrivalCounts.window_end
    FROM
    ArrivalCounts
    INNERJOIN
    DepartureCounts ON ArrivalCounts.window_start = DepartureCounts.window_start
    AND ArrivalCounts.window_end = DepartureCounts.window_end
    AND ArrivalCounts.arrival_timezone = DepartureCounts.departure_timezone
    ORDERBY
    ArrivalCounts.total_flights_arrival DESC,
    DepartureCounts.total_flights_departure DESC;

    3 使用 Metabase 視覺化

    Metabase 是一個開源的商業智慧工具,可以將數據視覺化並共享數據見解。它讓您能用簡單的方法,基於資料庫數據,建立各種圖表、看板和指標。

    將 RisingWave 連線到 Metabase

    由於 RisingWave 相容 PostgreSQL,您可以將 Metabase 連線到 RisingWave 作為資料來源,並在流數據上構建分析。

    您可以在 Metabase 中使用 RisingWave 作為資料來源,使用 RisingWave 中的表和物化檢視建立視覺化圖表和綜合看板。要了解具體步驟,請參閱 配置 Metabase 以讀取 RisingWave 數據 [7]

    成功將 RisingWave 連線到 Metabase 後,我們將 RisingWave 中的物化檢視作為資料來源添加,以建立表格、各種圖表和綜合看板。

    使用 Metabase 視覺化數據

    我們使用在 RisingWave 中的物化檢視和源(如 flight_tracking_source Airline_Flight_Counts Airport_Summary Timezone_Summary )建立這些表格、圖表和綜合看板。

    航班數據概覽表
    按航空公司跟蹤航班
    按機場跟蹤航班
    按時區跟蹤航班

    以下是一個綜合看板,展示了一系列用於即時航班跟蹤的圖表。它提供了航班操作的整體檢視,提供按航空公司、機場和時區分類的航班總數的洞察。此外,它還提供當前航班的詳細資訊,讓使用者得以全面監控並提出見解。

    即時航班跟蹤的綜合看板

    4 總結

    在本文中,我們使用 Upstash、RisingWave 和 Metabase 開發了一個即時航班跟蹤系統。因為 RisingWave 提供了廣泛的源和目標連結器,配置和連線變得非常簡單。我們將即時航班數據攝取到 Upstash 的 Kafka 主題中,然後將其發送到 RisingWave,並建立物化檢視以進行深入分析。最後,我們使用 Metabase 建立了視覺化圖表和即時看板,使使用者能夠監控航班營運並做出明智的決策。

    參考資料

    [1]

    Upstash Cloud Account: https://console.upstash.com/kafka

    [2]

    Upstash Kafka 文件: https://upstash.com/docs/kafka

    [3]

    RisingWave Cloud: https://cloud.risingwave.com/

    [4]

    官方 RisingWave 文件: https://docs.risingwave.com/docs/current/intro/

    [5]

    Slack 社群: https://www.risingwave.com/slack

    [6]

    在 RisingWave 中從 Upstash Kafka 攝取數據: https://docs.risingwave.com/docs/current/ingest-from-upstash-kafka/

    [7]

    配置 Metabase 以讀取 RisingWave 數據: https://docs.risingwave.com/docs/current/metabase-integration/

    關於 RisingW ave

    RisingWave 是一款基於 Apache 2.0 協定開源的分布式流資料庫,致力於為使用者提供極致簡單、高效的流數據處理與管理能力。RisingWave 采用存算分離架構,實作了高效的復雜查詢、瞬時動態擴縮容以及快速故障恢復,並助力使用者極大地簡化流計算架構,輕松搭建穩定且高效的流計算套用。

    RisingWave 始終聆聽來自社群的聲音,並積極回應使用者的反饋。目前,RisingWave 已匯聚了 150+ 名開源貢獻者和 3000+ 名社群成員。全球範圍內,已有上百個 RisingWave 集群在生產環境中部署。

    往期推薦

    技術內幕

    使用者案例