首页 > 编程知识 正文

上传速度慢怎么解决,数据库慢查询优化

时间:2023-05-04 17:36:36 阅读:13476 作者:3699

本文以新闻服务功能为例,介绍如何使用Lambda体系结构跟踪数据实时更新的项目实现。

目前股市的交易者可以了解到丰富的股票交易信息。 从金融新闻到传统报纸、杂志、博客和社交媒体,汇集了海量的数据,远远大于股票交易者想关注的股票信息,因此需要高效的过滤,为股票交易者提供信息。 这里为股票证券投资交易者开发新闻服务功能,为股票交易者提供个性化新闻。

该新闻服务被称为“自动获取金融新闻”,输入各数据源的金融新闻,同时输入用户的实时股票交易信息。 任何时候,在股票交易者拥有的资产证券中占很大比例的公司,新闻一到,都会显示在股票交易者的仪表板上。 大量股票交易者进行交易时,会发送相应的交易信息,因此希望有一个将所有交易者的历史交易信息作为实际数据源进行存储的大数据系统,但处理海量数据非常慢,无法进行实时数据更新。 为了保持实时数据跟踪和数据结果的最新性,可以通过采用Lambda体系结构来实现。

Lambda体系结构的优点

在传统的SQL系统中,更新表只会更改现有字段的值。 这在少数服务器上的数据库中运行良好,并且可以从库或备份库水平扩展。 但是,如果数据库扩展到了大量的数据服务器,则很难将数据恢复到失败点,例如在硬件崩溃时,这需要很长时间。 另外,由于数据库中没有历史记录,所以只存在日志,如果数据崩溃,则会发生看不见的数据错误,即脏数据。

相反,在分布式多拷贝消息队列大数据系统中,即使硬件或网络出现故障,数据进入系统后也不会丢失。 保存所有更新的历史记录可以重建实际数据源,并为每个批处理操作提供正确的结果。 但是,要在实时数据更新后获取最新的完整数据集,必须重新处理整个历史数据集,这需要太长时间。 为了解决这个问题,可以向Lambda体系结构中添加实时组件。 此组件仅存储数据更新的当前值,并提供与传统SQL系统相同的快速实时结果。 实时处理层的脏数据被后续批处理复盖,该系统可用性高,最终保持一致,结果准确。 当前值错误、实时处理层报告、硬件或网络错误、数据崩溃或软件错误等在下一个批处理过程中会自动修复。

自动获取金融新闻项目的数据管道

整个数据管道的流程如图1所示。

图1

输入数据格式为JSON,主要来自综合交易信息和推特新闻。 Jon格式的消息将推送到Kafka,并在“批处理层”(batch layer )和“实时处理层”(real-time layer )中使用。 之所以使用Kafka作为数据管道的输入起点,是因为Kafka可以确保即使硬件或网络出现故障,消息也会传输到整个系统。

在批层,Camus(Linkin开源项目,现在改名为Gobblin )消耗所有Kafka的消息并将其存储在HDFS中,然后Spark处理所有交易历史记录并计算每个股票交易者持有的准确数量

在流式传输层,Spark Streaming实时消耗Kafka消息,但不像Storm那样完全实时,Spark Streaming能够实现500ms的微总线数据流处理Spark Streaming可以重用批处理层中的Spark代码,从而为micro-batch数据流处理提供足够的延迟。

批处理层和实时处理层的结果写入Cassandra数据库,并通过Flask提供web接口服务。 随着大量的事务性数据写入系统,Cassandra数据库的快速写入功能几乎可以满足要求。

如何调度实时处理层和批处理层的结果

当最新消息进入大数据系统时,web界面提供的结果服务始终保持最新,并集成批处理层和实时层的处理结果。 以下是一个简单的使用批处理结果和实时处理结果的方法。

从下面的图2可以看到,有三个数据库表。 一个保存批处理结果(图2中的Batch表)。 保存从上次批处理完成到当前时间的实时事务数据,即增量数据。 (图2的实时2表); 另一个存储最新数据,即状态表(图2中高亮显示的实时1表)。

如果批处理结果由于软件、硬件或网络问题而出现异常,则在单个数据库表中记录数据增量,并在批处理成功后更新为相应的批处理结果数,以确保最终数据的一致性。

在本例中,假设第一次批处理的开始时间为t0,一个交易者进行交易后,获得了3M公司的5000股股票。

图2

t0时,批处理开始,处理完成后的最新结果保存在Real Time 1表中,当前值为5000股。

图3

在批处理过程中,交易者卖出了3M公司的1000股股票,Real Time 1表的更新数据值为4000股,Real Time 2表如图4所示存储着从t0到现在的增量-1000股。

图4

批处理完成后,三个表中的值分别为-1000、-1000和-1000。 此时,将active数据库表与Real Time 2表进行交换,将批处理结果与实时结果合并,得到最新的结果值。 然后复位Re

al Time 1表为0,后续用来存储从t1时间点开始的增量数据。接下来新的一轮以存储最新数据的Real Time 2表为起点,循环前面的过程。

图5

图6

图7

以上每步处理过程完全成功并写入数据库,可以保证展示给交易者的数据准确性。数据集 处理时间取决于数据集大小,处理任务的计划按序处理而不是按自然天时间。在一个系统中需要工作流支持复杂处理、多任务依赖和资源共享。这里采用 Airbnb的项目Airflow,可以调度程序和监控工作流。Airflow把task和上游各种依赖构建成一个有向无环图(DAG),基于 Python实现,可以把多个任务写成Bash脚本,Bash命令能直接调用任何模块,并且Bash脚本可以被Airflow使用,这样使得 Airflow易操作。Airflow编程接口比基于XML配置的调度系统Oozie简单;Airflow的Bash脚本编码量比hpdxh要少很多,hpdxh的每个job都是一个python工程。每步合并实时和批量数据的job运行都是前一步成功完成退出后。

最后简单总结一下,Lambda架构涉及批量处理层和实时处理层处理历史数据以及实时更新的数据。 为了Lambda架构的实现切实可行,数据处理要设计成批处理层和实时处理层结合。本项目中,有一个“备用”数据库表专门用来存储输入的总数,而不从批处 理层读取数据,并允许对批处理层和实时处理层的结果进行简单的聚合。以上就是用Lambda架构实现的一个高可用、高数据最终一致性的系统。

【编辑推荐】

【责任编辑:Ophira TEL:(010)68476606】

点赞 0

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。