当前位置: 欣欣网 > 资讯

使用 RisingWave、Upstash 和 Metabase 进行实时航班跟踪

2024-05-29资讯

在物流和航空领域,实时数据处理、数据可视化和综合看板至关重要。它们能够提升供应链性能、优化运输并提高效率。特别是对商务航空来说,实时分析可以提高效率、可持续性和客户关注度,从而改善决策并节约成本。它还帮助确保安全运行、优化空域使用并能协助空中交通管制员。总体而言,实时数据处理可以帮助航空业提升客户体验并改善业务运营。

在本文中,我们使用 RisingWave、Upstash 和 Metabase 建立了一个实时航班跟踪系统。我们利用 Aviationstack API 实时获取航班数据,然后将这些数据传输到 Upstash 的一个 Kafka 主题中。随后,我们将这些数据流摄取到 RisingWave 中,继而创建物化视图 (MV) 以进行深入的航班数据分析。物化视图随时提供最新数据,并可以即时查询。我们还使用 Metabase 创建图表、表格和综合的数据看板,用于实时航班跟踪。

使用 RisingWave、Upstash 和 Metabase 实现的实时航班跟踪

1 在 Upstash 中设置 Kafka

Upstash 是一个无服务器平台,提供 Redis、Kafka 和 Qstash 服务,具有可扩展性、高级安全选项和专门支持的优势。Upstash Kafka 使用 Apache Kafka 进行部署,并提供一个无服务器的 Kafka 平台,配备连接器、模式注册表和监控,为有高级需求的客户提供各种计划。

注册 Upstash 账户

注册免费的 Upstash 云账户,以访问 Kafka 服务。要创建账户,请访问 Upstash Cloud Account [1]

Upstash:账户注册和登录流程

创建 Kafka 集群

登录后,使用以下信息创建 Kafka 集群:

  • Name : 给您的 Kafka 集群一个唯一名称以便识别。

  • Region : 选择您的 Kafka 集群托管的区域。

  • Type : 选择适合您需求的集群类型。

  • Upstash:创建 Kafka 集群

    设置 Kafka 主题

    创建 Kafka 集群后,设置 Kafka 主题。Upstash Kafka 提供一些默认配置,包括分区数量和保留策略,简化了设置过程。

    Upstash:创建 Kafka 主题

    创建 Kafka 集群和 Kafka 主题后,我们就可以利用 Upstash Kafka 和 RisingWave 的功能来构建流处理应用程序和管道。如果您还想了解更多有关以上流程的信息,请参阅 Upstash Kafka 文档 [2]

    我们摄取到 Upstash Kafka 主题的示例数据包含来自航空 API 的实时数据,包括机场名称、航班状态、航班位置等信息。

    {
    "flight_date""2024-05-16",
    "flight_status""scheduled",
    "departure_airport""Auckland International",
    "departure_timezone""Pacific/Auckland",
    "departure_iata""AKL",
    "departure_icao""NZAA",
    "departure_terminal""D",
    "departure_gate""28",
    "departure_delay"null,
    "departure_scheduled""2024-05-16T06:30:00+00:00",
    "departure_estimated""2024-05-16T06:30:00+00:00",
    "departure_actual"null,
    "departure_estimated_runway"null,
    "departure_actual_runway"null,
    "arrival_airport""Wellington International",
    "arrival_timezone""Pacific/Auckland",
    "arrival_iata""WLG",
    "arrival_icao""NZWN",
    "arrival_terminal"null,
    "arrival_gate""15",
    "arrival_baggage"null,
    "arrival_delay"null,
    "arrival_scheduled""2024-05-16T07:40:00+00:00",
    "arrival_estimated""2024-05-16T07:40:00+00:00",
    "arrival_actual"null,
    "arrival_estimated_runway"null,
    "arrival_actual_runway"null,
    "airline_name""Singapore Airlines",
    "airline_iata""SQ",
    "airline_icao""SIA",
    "flight_number""SQ4438",
    "flight_iata""SQ4438",
    "flight_icao""SIA4438",
    "codeshared_airline_name""air new zealand",
    "codeshared_airline_iata""nz",
    "codeshared_airline_icao""anz",
    "codeshared_flight_number""401",
    "codeshared_flight_iata""nz401",
    "flight_info""Singapore Airlines flight SQ4438 is currently in the air, flying from Auckland International (AKL) to Wellington International (WLG)"
    }

    2 将数据从 Upstash Kafka 摄取到 RisingWave

    对于摄取和处理流数据,我们有两种可用方法:开源的 RisingWave 和托管服务 RisingWave Cloud。在本文中,我们将重点使用 RisingWave Cloud,它的用户体验更加便捷友好,简化了管理和利用 RisingWave 进行航班跟踪解决方案的操作。

    创建 RisingWave 集群

    要在 RisingWave Cloud [3] 中创建 RisingWave 集群并探索各种功能,您可以注册 Free Plan 来免费测试 RisingWave 的功能。有关如何创建 RisingWave 集群并开始使用的详细说明,请参阅 官方 RisingWave 文档 [4] 。它将为您提供设置和探索 RisingWave 功能的分步指导。如果您需要额外的帮助,请加入我们的 Slack 社区 [5]

    RisingWave Cloud:账户注册和登录流程

    将数据流摄取到 RisingWave

    现在我们已经在 Upstash 中(以 JSON 格式)设置了 Kafka 数据流,我们可以使用以下 SQL 语句连接到这些数据流。有关更多信息,请参阅 在 RisingWave 中从 Upstash Kafka 摄取数据 [6]

    CREATESOURCE flight_tracking_source(
    flight_date VARCHAR,
    flight_status VARCHAR,
    departure_airport VARCHAR,
    departure_timezone VARCHAR,
    departure_iata VARCHAR,
    departure_icao VARCHAR,
    departure_terminal VARCHAR,
    departure_gate VARCHAR,
    departure_delay INTERVAL,
    departure_scheduled TIMESTAMPWITHTIME ZONE,
    departure_estimated TIMESTAMPWITHTIME ZONE,
    departure_actual TIMESTAMPWITHTIME ZONE,
    departure_estimated_runway TIMESTAMPWITHTIME ZONE,
    departure_actual_runway TIMESTAMPWITHTIME ZONE,
    arrival_airport VARCHAR,
    arrival_timezone VARCHAR,
    arrival_iata VARCHAR,
    arrival_icao VARCHAR,
    arrival_terminal VARCHAR,
    arrival_gate VARCHAR,
    arrival_baggage VARCHAR,
    arrival_delay INTERVAL,
    arrival_scheduled TIMESTAMPWITHTIME ZONE,
    arrival_estimated TIMESTAMPWITHTIME ZONE,
    arrival_actual TIMESTAMPWITHTIME ZONE,
    arrival_estimated_runway TIMESTAMPWITHTIME ZONE,
    arrival_actual_runway TIMESTAMPWITHTIME ZONE,
    airline_name VARCHAR,
    airline_iata VARCHAR,
    airline_icao VARCHAR,
    flight_number VARCHAR,
    flight_iata VARCHAR,
    flight_icao VARCHAR,
    codeshared_airline_name VARCHAR,
    codeshared_airline_i
    ata VARCHAR,
    codeshared_airline_icao VARCHAR,
    codeshared_flight_number VARCHAR,
    codeshared_flight_iata VARCHAR,
    flight_info VARCHAR
    )
    WITH(
    connector='kafka',
    topic ='flights_tracking',
    properties.bootstrap.server ='delicate-herring-9260-us1-kafka.upstash.io:9092',
    properties.sasl.mechanism = 'SCRAM-SHA-256',
    properties.security.protocol = 'SASL_SSL',
    properties.sasl.username = 'xxxxxx',
    properties.sasl.password = 'xxxxxx',
    scan.startup.mode ='earliest'
    )FORMAT PLAIN ENCODEJSON;




    通过 CREATE SOURCE 语句,RisingWave 已连接到数据流,但尚未开始消费数据。为了增量处理和存储数据,我们需要创建物化视图。创建物化视图后,RisingWave 将从指定的偏移量开始消费数据。

    设置物化视图以分析航班数据

    我们将创建不同的物化视图,这些视图跟踪并提取与 flight_tracking_source 相关的各种航班信息。这些信息包括航班日期、状态、出发和到达详细信息(机场、时区、IATA 码、ICAO 码、计划和估计时间)、航空公司信息(名称、IATA 码、ICAO 码)、航班号和标识符(IATA 和 ICAO 码)以及一般航班信息。

    使用物化视图是因为它们始终提供最新数据。例如,以下查询创建了一个名为 Airline_Flight_Counts 的物化视图,该视图按小时间隔计算每个航空公司的航班数量。它使用了上文创建的 flight_tracking_source ,并按航空公司名称和一小时的时间窗口对数据进行分组。

    CREATEMATERIALIZEDVIEW Airline_Flight_Counts
    SELECT airline_name,
    COUNT(airline_name) AS total_flights,
    window_start, window_end
    FROM TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL'1 hour')
    GROUPBY airline_name,window_start, window_end
    ORDERBY total_flights desc;

    以下查询创建了一个名为 Airport_Summary 的物化视图,该视图按小时间隔计算每个机场的到达和出发航班总数。计算结果按机场和一小时的时间窗口进行分组,并按航班总数降序排列。

    CREATEMATERIALIZEDVIEW Airport_Summary
    WITH ArrivalCounts AS (
    SELECT
    arrival_airport,
    COUNT(arrival_airport) AS total_flights_arrival,
    window_start,
    window_end
    FROM
    TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL'1 hour')
    GROUPBY
    arrival_airport,
    window_start,
    window_end
    ),
    DepartureCounts AS (
    SELECT
    departure_airport,
    COUNT(departure_airport) AS total_flights_departure,
    window_start,
    window_end
    FROM
    TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL'1 hour')
    GROUPBY
    departure_airport,
    window_start,
    window_end
    )
    SELECT
    ArrivalCounts.arrival_airport,
    ArrivalCounts.total_flights_arrival,
    DepartureCounts.departure_airport,
    DepartureCounts.total_flights_departure,
    ArrivalCounts.window_start,
    ArrivalCounts.window_end
    FROM
    ArrivalCounts
    INNERJOIN
    DepartureCounts ON ArrivalCounts.window_start = DepartureCounts.window_start
    AND ArrivalCounts.window_end = DepartureCounts.window_end
    AND ArrivalCounts.arrival_airport = DepartureCounts.departure_airport
    ORDERBY
    ArrivalCounts.total_flights_arrival DESC,
    DepartureCounts.total_flights_departure DESC;

    以下查询创建了一个名为 Timezone_Summary 的物化视图,该视图按小时间隔计算每个时区的到达和出发航班总数。它使用数据源 flight_tracking_source ,按时区和一小时的时间窗口对数据进行分组,然后按航班总数降序排列结果。

    CREATEMATERIALIZEDVIEW Timezone_Summary
    WITH ArrivalCounts AS (
    SELECT
    arrival_timezone,
    COUNT(arrival_timezone) AS total_flights_arrival,
    window_start,
    window_end
    FROM
    TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL'1 hour')
    GROUPBY
    arrival_timezone,
    window_start,
    window_end
    ),
    DepartureCounts AS (
    SELECT
    departure_timezone,
    COUNT(departure_timezone) AS total_flights_departure,
    window_start,
    window_end
    FROM
    TUMBLE (flight_tracking_source, arrival_scheduled, INTERVAL'1 hour')
    GROUPBY
    departure_timezone,
    window_start,
    window_end
    )
    SELECT
    ArrivalCounts.arrival_timezone,
    ArrivalCounts.total_flights_arrival,
    DepartureCounts.departure_timezone,
    DepartureCounts.total_flights_departure,
    ArrivalCounts.window_start,
    ArrivalCounts.window_end
    FROM
    ArrivalCounts
    INNERJOIN
    DepartureCounts ON ArrivalCounts.window_start = DepartureCounts.window_start
    AND ArrivalCounts.window_end = DepartureCounts.window_end
    AND ArrivalCounts.arrival_timezone = DepartureCounts.departure_timezone
    ORDERBY
    ArrivalCounts.total_flights_arrival DESC,
    DepartureCounts.total_flights_departure DESC;

    3 使用 Metabase 可视化

    Metabase 是一个开源的商业智能工具,可以将数据可视化并共享数据见解。它让您能用简单的方法,基于数据库数据,创建各种图表、看板和指标。

    将 RisingWave 连接到 Metabase

    由于 RisingWave 兼容 PostgreSQL,您可以将 Metabase 连接到 RisingWave 作为数据源,并在流数据上构建分析。

    您可以在 Metabase 中使用 RisingWave 作为数据源,使用 RisingWave 中的表和物化视图创建可视化图表和综合看板。要了解具体步骤,请参阅 配置 Metabase 以读取 RisingWave 数据 [7]

    成功将 RisingWave 连接到 Metabase 后,我们将 RisingWave 中的物化视图作为数据源添加,以创建表格、各种图表和综合看板。

    使用 Metabase 可视化数据

    我们使用在 RisingWave 中的物化视图和源(如 flight_tracking_source Airline_Flight_Counts Airport_Summary Timezone_Summary )创建这些表格、图表和综合看板。

    航班数据概览表
    按航空公司跟踪航班
    按机场跟踪航班
    按时区跟踪航班

    以下是一个综合看板,展示了一系列用于实时航班跟踪的图表。它提供了航班操作的整体视图,提供按航空公司、机场和时区分类的航班总数的洞察。此外,它还提供当前航班的详细信息,让用户得以全面监控并提出见解。

    实时航班跟踪的综合看板

    4 总结

    在本文中,我们使用 Upstash、RisingWave 和 Metabase 开发了一个实时航班跟踪系统。因为 RisingWave 提供了广泛的源和目标连接器,配置和连接变得非常简单。我们将实时航班数据摄取到 Upstash 的 Kafka 主题中,然后将其发送到 RisingWave,并创建物化视图以进行深入分析。最后,我们使用 Metabase 创建了可视化图表和实时看板,使用户能够监控航班运营并做出明智的决策。

    参考资料

    [1]

    Upstash Cloud Account: https://console.upstash.com/kafka

    [2]

    Upstash Kafka 文档: https://upstash.com/docs/kafka

    [3]

    RisingWave Cloud: https://cloud.risingwave.com/

    [4]

    官方 RisingWave 文档: https://docs.risingwave.com/docs/current/intro/

    [5]

    Slack 社区: https://www.risingwave.com/slack

    [6]

    在 RisingWave 中从 Upstash Kafka 摄取数据: https://docs.risingwave.com/docs/current/ingest-from-upstash-kafka/

    [7]

    配置 Metabase 以读取 RisingWave 数据: https://docs.risingwave.com/docs/current/metabase-integration/

    关于 RisingW ave

    RisingWave 是一款基于 Apache 2.0 协议开源的分布式流数据库,致力于为用户提供极致简单、高效的流数据处理与管理能力。RisingWave 采用存算分离架构,实现了高效的复杂查询、瞬时动态扩缩容以及快速故障恢复,并助力用户极大地简化流计算架构,轻松搭建稳定且高效的流计算应用。

    RisingWave 始终聆听来自社区的声音,并积极回应用户的反馈。目前,RisingWave 已汇聚了 150+ 名开源贡献者和 3000+ 名社区成员。全球范围内,已有上百个 RisingWave 集群在生产环境中部署。

    往期推荐

    技术内幕

    用户案例