您好,欢迎来到抵帆知识网。
搜索
您的当前位置:首页基于Spark Streaming的车辆实时监控方法及系统[发明专利]

基于Spark Streaming的车辆实时监控方法及系统[发明专利]

来源:抵帆知识网
(19)中华人民共和国国家知识产权局

(12)发明专利申请

(10)申请公布号 CN 108171971 A(43)申请公布日 2018.06.15

(21)申请号 201711361389.2(22)申请日 2017.12.18

(71)申请人 武汉烽火众智数字技术有限责任公

地址 430074 湖北省武汉市洪山区邮科院

路88号(72)发明人 严其松 贺珊 

(74)专利代理机构 北京汇泽知识产权代理有限

公司 11228

代理人 张涛(51)Int.Cl.

G08G 1/01(2006.01)G06F 17/30(2006.01)G06F 9/54(2006.01)

权利要求书2页 说明书6页 附图2页

CN 108171971 A(54)发明名称

基于Spark Streaming的车辆实时监控方法及系统(57)摘要

本发明提供了一种基于Spark Streaming的车辆实时监控方法和系统,属于大数据技术领

利用Kafka消息队列传输及存域。所述方法包括;

储所述实时交通数据;通过Spark Streaming从所述Kafka消息队列中拉取所述实时交通数据,实时计算单位时间内的车流量和车辆归属地信息,并将所述单位时间内的车流量和所述车辆归属地信息进行批量存储。将Kafka消息队列和Spark Streaming关联起来,利用消息队列的方式进行数据采集和传输,并利用分布式的流处理Spark Streaming进行数据处理,提供车联网中实时车辆的数据查询。

CN 108171971 A

权 利 要 求 书

1/2页

1.一种基于Spark Streaming的车辆实时监控方法,其特征在于,所述方法包括:采集实时交通数据;

利用Kafka消息队列传输及存储所述实时交通数据;通过Spark Streaming从所述Kafka消息队列中拉取所述实时交通数据,实时计算单位时间内的车流量和车辆归属地信息,并将所述单位时间内的车流量和所述车辆归属地信息进行批量存储。

2.如权利要求1所述的方法,其特征在于,所述实时计算单位时间内的车流量和车辆归属地信息,具体包括:

记录车辆通行的时间信息和车牌信息;

根据所述时间信息判断单位时间内的车流量,根据所述车牌信息和车辆归属地数据库,判断所述车辆的归属地信息。

3.如权利要求1或2所述的方法,其特征在于,所述方法还包括:根据用户的查询请求,查询单位时间的车流量信息和/或车辆的归属地信息,所述查询请求包括时间信息和/或车牌信息,所述时间信息包括当前时间信息或历史时间信息。

4.如权利要求3所述的方法,其特征在于,所述查询请求还包括区域信息,查询所述区域内出现车辆的归属地信息。

5.如权利要求1-4任一所述的方法,其特征在于,所述车辆归属地信息包括卡口信息编号属性、全国县市属性和车辆总数据量属性。

6.一种基于Spark Streaming的车辆实时监控系统,其特征在于,所述系统包括数据采集服务器、大数据集群和缓存服务器;

所述数据采集服务器利用TDU模块采集实时交通数据;所述大数据集群连接所述缓存服务器,将所述实时交通数据传输到所述缓存服务器进行缓存;

所述大数据集群通过Spark Streaming从所述缓存服务器的Kafka消息队列中拉取实时交通数据,实时计算单位时间内的车流量和车辆归属地信息,并将所述单位时间内的车流量和所述车辆归属地信息发送给所述缓存服务器进行批量存储。

7.如权利要求6所述的系统,其特征在于,所述大数据集群包括Kafka服务器和Spark Streaming服务器;

所述Kafka服务器对所述实时交通数据进行传输,并将所述实时交通数据缓存到所述缓存服务器;

所述Spark Streaming服务器包括拉起模块、计算模块和发送模块;所述拉起模块,通过Spark Streaming从所述缓存服务器的Kafka消息队列中拉取实时交通数据;

所述计算模块用于记录车辆通行的时间信息和车牌信息,根据所述时间信息判断单位时间内的车流量,根据所述车牌信息和车辆归属地数据库,判断所述车辆的归属地信息;

所述发送模块,将所述计算模块计算得到的所述单位时间内的车流量和所述车辆归属地信息发送给所述缓存服务器。

8.如权利要求6或7所述的系统,其特征在于,所述大数据集群还包括查询服务器,连接所述缓存服务器;

2

CN 108171971 A

权 利 要 求 书

2/2页

根据用户的查询请求,在所述缓存服务器中查询单位时间的车流量信息和/或车辆的归属地信息,所述查询请求包括时间信息和/或车牌信息,所述时间信息包括当前时间信息或历史时间信息。

9.如权利要求8所述的系统,其特征在于,所述查询请求还包括区域信息,查询所述区域内出现车辆的归属地信息。

10.如权利要求6-9任一所述的系统,其特征在于,所述缓存服务器部署有HBase数据库或者Oracle数据库。

3

CN 108171971 A

说 明 书

基于Spark Streaming的车辆实时监控方法及系统

1/6页

技术领域

[0001]本发明涉及大数据技术领域,尤其涉及基于Spark Streaming的车辆实时监控方法和系统。

背景技术

[0002]Spark Streaming是基于Apache Spark的流式计算引擎,建立在Spark上的实时计算框架,通过它提供的API和基于内存的高速执行引擎,用户可以结合流式、批处理和交换式进行查询和实时计算。具有吞吐量大、受Yarn调度,接受Resource Manager管理、Spark Streaming on Yarn稳定性更优、且支持Streaming SQL的优点。[0003]目前的流式海量数据已经应用到多个平台中,如大众点评实时平台、1号店实时平台、爱奇艺实时采集计算平台和百度分布式交互查询系统PINGO等,对于车联网中还没有相应的海量数据实时监控系统。

发明内容

[0004]本发明实施例提出了一种基于Spark Streaming的车辆实时监控方法及系统,在车联网下部署不同的服务器,完成对车联网中车辆的实时监控。[0005]本发明实施例提供一种基于Spark Streaming的车辆实时监控方法,所述方法包括:

[0006]采集实时交通数据;[0007]利用Kafka消息队列传输及存储所述实时交通数据;;[0008]通过Spark Streaming从所述Kafka消息队列中拉取所述实时交通数据,实时计算单位时间内的车流量和车辆归属地信息,并将所述单位时间内的车流量和所述车辆归属地信息进行批量存储。[0009]其中,所述实时计算单位时间内的车流量和车辆归属地信息,具体包括:[0010]记录车辆通行的时间信息和车牌信息;

[0011]根据所述时间信息判断单位时间内的车流量,根据所述车牌信息和车辆归属地数据库,判断所述车辆的归属地信息。[0012]进一步的,所述方法还包括:根据用户的查询请求,查询单位时间的车流量信息和/或车辆的归属地信息,所述查询请求包括时间信息和/或车牌信息,所述时间信息包括当前时间信息或历史时间信息。[0013]进一步的,所述查询请求还包括区域信息,查询所述区域内出现车辆的归属地信息。

[0014]其中,所述车辆归属地信息包括卡口信息编号属性、全国县市属性和车辆总数据量属性。

[0015]本发明实施例还提供了一种基于Spark Streaming的车辆实时监控系统,所述系统包括数据采集服务器、大数据集群和缓存服务器;

4

CN 108171971 A[0016]

说 明 书

2/6页

所述数据采集服务器利用TDU模块采集实时交通数据;

[0017]所述大数据集群连接所述缓存服务器,将所述实时交通数据进行传输到所述缓存服务器进行缓存;

[0018]所述大数据集群通过Spark Streaming从所述缓存服务器的Kafka消息队列中拉取实时交通数据,实时计算单位时间内的车流量和车辆归属地信息,并将所述单位时间内的车流量和所述车辆归属地信息发送给所述缓存服务器进行批量存储。[0019]具体的,所述大数据集群包括Kafka服务器和Spark Streaming服务器;[0020]所述Kafka服务器对所述实时交通数据进行传输,并将所述实时交通数据缓存到所述缓存服务器;[0021]所述Spark Streaming服务器包括拉起模块、计算模块和发送模块;[0022]所述拉起模块,通过Spark Streaming从所述缓存服务器的Kafka消息队列中拉取实时交通数据;

[0023]所述计算模块用于记录车辆通行的时间信息和车牌信息,根据所述时间信息判断单位时间内的车流量,根据所述车牌信息和车辆归属地数据库,判断所述车辆的归属地信息;

[0024]所述发送模块,将所述计算模块计算得到的所述单位时间内的车流量和所述车辆归属地信息发送给所述缓存服务器。[0025]进一步的,所述大数据集群还包括查询服务器,连接所述缓存服务器;[0026]根据用户的查询请求,在所述缓存服务器中查询单位时间的车流量信息和/或车辆的归属地信息,所述查询请求包括时间信息和/或车牌信息,所述时间信息包括当前时间信息或历史时间信息。[0027]进一步的,所述查询请求还包括区域信息,查询所述区域内出现车辆的归属地信息。

[0028]具体的,所述缓存服务器部署有HBase数据库或者Oracle数据库。[0029]有益效果如下:

[0030]本方案将Kafka消息队列和Spark Streaming关联起来,利用消息队列的方式进行数据采集和传输,并利用分布式的流处理Spark Streaming进行数据处理,提供车联网中实时车辆的数据查询,可以将计算的结果数据和中间交换数据根据业务需求存放在HBase数据库或者Oracle数据库中,方便中间件进行调用。附图说明

[0031]下面将参照附图描述本发明的具体实施例,其中:[0032]图1示出了本发明实施例一中基于Spark Streaming的车辆实时监控方法流程图;[0033]图2示出了本发明实施例二中基于Spark Streaming的车辆实时监控系统的结构示意图;

[0034]图3示出了本发明实施例二中基于Spark Streaming的车辆实时监控系统的另一结构示意图。具体实施方式

5

CN 108171971 A[0035]

说 明 书

3/6页

为了使本发明的技术方案及优点更加清楚明白,以下结合附图对本发明的示例性

实施例进行进一步详细的说明,显然,所描述的实施例仅是本发明的一部分实施例,而不是所有实施例的穷举。并且在不冲突的情况下,本说明中的实施例及实施例中的特征可以互相结合。

[0036]Spark Streaming的基本原理是将Stream数据分成小的时间片段(几秒钟到几分钟),以类似batch批处理的方式来处理这些小部分数据。Spark针对持续性数据流的抽象称为DStream(DiscretizedStream),一个Dstream是一个微批处理(microbatching)的RDD(弹性分布式数据集);而RDD则是一种分布式数据集,能够以两种方式并行运作,分别是任意函数和滑动窗口数据的转换。

[0037]Kafka是设计独特的消息队列,具有非常高的吞吐能力,以及强大的扩展性,Kafka是设计目的是分布式、高吞吐、高扩展。Kafka是一个消息发布订阅系统,分为Producer、Broker、Consumer几个模块。Producer向某个Topic发布消息,发往Kafka数据由Broker进行存储和转发,Consumer订阅某个Topic的消息,进而一旦有新的关于某个Topic的消息,Broker会传递给订阅它的所有Consumer。在Kafka中,消息是按Topic组织的,每个Topic又会分为多个Partition,这样便于管理数据和进行负载均衡,负载均衡通过Zookeeper来做的。

[0038]实施例一

[0039]图1示出了本发明实施例提供的基于Spark Streaming的车辆实时监控方法,具体包括:

[0040]步骤101:采集实时交通数据;[0041]具体的,利用前端相机抓拍图片,通过TDU模块上传平台,采集实时交通数据。[0042]步骤102:利用Kafka消息队列存储实时交通数据;[0043]步骤103:通过Spark Streaming从Kafka消息队列中拉取实时交通数据,实时计算单位时间内的车流量和车辆归属地信息;[0044]其中,实时计算单位时间内的车流量和车辆归属地信息,具体包括:[0045]记录车辆通行的时间信息和车牌信息;[0046]根据时间信息判断单位时间内的车流量,根据车牌信息和车辆归属地数据库,判断车辆的归属地信息。

[0047]本发明实施例中会统计车辆的通行情况,如通行的时间信息及该车辆的车牌信息,进而可以根据时间信息统计一天内某个时刻通行车辆的数量,也可以统计某个区域内通行车辆的归属地信息。[0048]步骤104:将单位时间内的车流量和车辆归属地信息进行批量存储。[0049]另外,本发明实施例还提供了查询功能;[0050]根据用户的查询请求,查询单位时间的车流量信息和/或车辆的归属地信息,该查询请求包括时间信息和/或车牌信息,时间信息包括当前时间信息或历史时间信息。也就是说,本发明中,可以查询历史交通数据,也可以查询当前实时交通数据。[0051]其中,查询请求还可以包括区域信息,能够查询某区域内出现车辆的归属地信息,便于统计各个区域的车辆通行情况。[0052]需要说明的是,本发明中,车辆归属地信息包括卡口信息编号属性、全国县市属性

6

CN 108171971 A

说 明 书

4/6页

和车辆总数据量属性,对于存储的数据库来说,该车辆归属地信息的量级是较大的,该量级为卡口信息、全国县市及车辆总数量的三者之积,其中,卡口信息编号有86个,全国县市有436个,而车辆总数据量也不小。[0053]实际中,实时计算流程的整个架构是一个比较长的过程,包括数据采集、数据的传输、计算、存储等。首先设置一定策略利用TDU模块实时采集,然后同步到Kafka消息队列中;对于需要实时计算的应用,一般通过Storm和Spark Streaming等流式计算框架从消息队列中拉取消息,完成相关的过滤和计算,最后存到HBase、Oracle等数据库中;对于实时性要求不高的应用,通过Flume系统直接Sink到HDFS中,然后一般通过ETL、Spark SQL和批处理的MR作业等抽取到HBase、Oracle等数据库中。[0054]为便于说明,本发明提供了一个例子来描述基于Spark Streaming的车辆实时监控方法;

[0055]首先,初始化全国车辆归属地相关信息,例如鄂A,所属地是湖北省武汉市等信息;[0056]初始化hibernate来读取本地库保存的小时流量统计和车辆归属地两个表的结果,由于数据性能的问题,初始化中只初始化当天数据,并把初始化数据构造成initRDD,最后把initRDD分布式化;[0057]然后,构造StreamingContext对象,来对接Kafka接受来的数据流,并根据所属创建curRDD;

[0058]把curRDD根据所属的时间条件和所属归属地情况转化为车流量的vehicleFlowRDD和车辆归属地carbelongRDD;[0059]利用Spark Streaming的累加器函数updateStateByKey把initRDD和curRDD根据条件进行累加;

[0060]每隔固定周期时间,把统计结果保存到持久化层,如HBase数据库或者Oracle数据库;

[0061]对外提供车辆小时流量统计和车辆的归属地统计查询接口,出于性能优化的考虑,查询的时候查询当天可以查询持久化层,而查询历史统计结果则查询数据库Postgresql。

[0062]本方案提供的基于Spark Streaming的车辆实时监控方法,将Kafka消息队列和Spark Streaming关联起来,利用消息队列的方式进行数据采集和传输,并利用分布式的流处理Spark Streaming进行数据处理,提供车联网中实时车辆的数据查询,可以将计算的结果数据和中间交换数据根据业务需求存放在HBase数据库或者Oracle数据库中,方便调用。[0063]实施例二

[0064]本方案提供的基于Spark Streaming的车辆实时监控系统,需要部署各种不同类型的服务器,如采集服务器、缓存服务器,在每一种类型的服务器部署完成之后,都可以通过阿里云镜像的功能,创建一个能随时使用的镜像,这样在扩展服务器的时候就不需要重新安装软件,直接通过镜像创建服务器就可以了。[0065]下面结合图2和图3,给出本方案提供的基于Spark Streaming的车辆实时监控系统,包括数据采集服务器201、大数据集群202和缓存服务器203;[0066]数据采集服务器201利用TDU模块采集实时交通数据;[0067]大数据集群202连接缓存服务器203,将实时交通数据传输到缓存服务器203进行

7

CN 108171971 A

说 明 书

5/6页

缓存,通过Spark Streaming从缓存服务器203的Kafka消息队列中拉取实时交通数据,实时计算单位时间内的车流量和车辆归属地信息,并将单位时间内的车流量和车辆归属地信息发送给缓存服务器203进行批量存储。[0068]实际应用中,缓存服务器部署有HBase数据库或者Oracle数据库。[0069]具体的,本发明中,Spark Streaming服务器包括拉起模块、计算模块和发送模块;[0070]拉起模块,通过Spark Streaming从缓存服务器的Kafka消息队列中拉取实时交通数据;

[0071]计算模块用于记录车辆通行的时间信息和车牌信息,根据时间信息判断单位时间内的车流量,根据车牌信息和车辆归属地数据库,判断车辆的归属地信息;[0072]发送模块,将计算模块计算得到的单位时间内的车流量和车辆归属地信息发送给缓存服务器。

[0073]进一步的,本发明基于Spark Streaming的车辆实时监控系统还包括查询服务器,连接缓存服务器;

[0074]根据用户的查询请求,在缓存服务器中查询单位时间的车流量信息和/或车辆的归属地信息,查询请求包括时间信息和/或车牌信息,时间信息包括当前时间信息或历史时间信息。

[0075]其中,查询请求还包括区域信息,查询区域内出现车辆的归属地信息。[0076]本发明提供的基于Spark Streaming的车辆实时监控系统,将Kafka消息队列和Spark Streaming关联起来,利用消息队列的方式进行数据采集和传输,并利用分布式的流处理Spark Streaming进行数据处理,提供车联网中实时车辆的数据查询,可以将计算的结果数据和中间交换数据根据业务需求存放在HBase数据库或者Oracle数据库中,方便调用。[0077]为了描述的方便,以上装置的各部分以功能分为各种模块或单元分别描述。当然,在实施本发明时可以把各模块或单元的功能在同一个或多个软件或硬件中实现。[0078]本领域内的技术人员应明白,本发明的实施例可提供为方法、系统、或计算机程序产品。因此,本发明可采用完全硬件实施例、完全软件实施例、或结合软件和硬件方面的实施例的形式。而且,本发明可采用在一个或多个其中包含有计算机可用程序代码的计算机可用存储介质(包括但不限于磁盘存储器、CD-ROM、光学存储器等)上实施的计算机程序产品的形式。

[0079]本发明是参照根据本发明实施例的方法、设备(系统)、和计算机程序产品的流程图和/或方框图来描述的。应理解可由计算机程序指令实现流程图和/或方框图中的每一流程和/或方框、以及流程图和/或方框图中的流程和/或方框的结合。可提供这些计算机程序指令到通用计算机、专用计算机、嵌入式处理机或其他可编程数据处理设备的处理器以产生一个机器,使得通过计算机或其他可编程数据处理设备的处理器执行的指令产生用于实现在流程图一个流程或多个流程和/或方框图一个方框或多个方框中指定的功能的装置。[0080]这些计算机程序指令也可存储在能引导计算机或其他可编程数据处理设备以特定方式工作的计算机可读存储器中,使得存储在该计算机可读存储器中的指令产生包括指令装置的制造品,该指令装置实现在流程图一个流程或多个流程和/或方框图一个方框或多个方框中指定的功能。

[0081]这些计算机程序指令也可装载到计算机或其他可编程数据处理设备上,使得在计

8

CN 108171971 A

说 明 书

6/6页

算机或其他可编程设备上执行一系列操作步骤以产生计算机实现的处理,从而在计算机或其他可编程设备上执行的指令提供用于实现在流程图一个流程或多个流程和/或方框图一个方框或多个方框中指定的功能的步骤。

[0082]尽管已描述了本发明的优选实施例,但本领域内的技术人员一旦得知了基本创造性概念,则可对这些实施例作出另外的变更和修改。所以,所附权利要求意欲解释为包括优选实施例以及落入本发明范围的所有变更和修改。

9

CN 108171971 A

说 明 书 附 图

1/2页

图1

图2

10

CN 108171971 A

说 明 书 附 图

2/2页

图3

11

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- dfix.cn 版权所有

违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务