需求

最近有个分析电商商品的销量变化的产品需求,定时抓取某个时间点销量。
再按时间店铺找到销量比较高的商品, 也就是时序分析。
时序数据库好多锤子可以用 Druid, InfluxDB

Rollup

在OLAP体系里,维度度下钻(Drill) 意味着维度粒度能达到的最小程度。
以时间的粒度为例:

  • 分钟 小时,天,周

假设只需要到天这个粒度。
那么到小时到数据是完全没有必要存的(牺牲细节)。
通过对维度的Rollup,可以大大减少原始数据的行数从而达到性能上的提升,并且节省了很多空间。

原始数据

1
2
3
4
5
6
7
8
9
10
timestamp                 srcIP         dstIP          packets     bytes
2018-01-01T01:01:35Z 1.1.1.1 2.2.2.2 100 1000
2018-01-01T01:01:51Z 1.1.1.1 2.2.2.2 200 2000
2018-01-01T01:01:59Z 1.1.1.1 2.2.2.2 300 3000
2018-01-01T01:02:14Z 1.1.1.1 2.2.2.2 400 4000
2018-01-01T01:02:29Z 1.1.1.1 2.2.2.2 500 5000
2018-01-01T01:03:29Z 1.1.1.1 2.2.2.2 600 6000
2018-01-02T21:33:14Z 7.7.7.7 8.8.8.8 100 1000
2018-01-02T21:33:45Z 7.7.7.7 8.8.8.8 200 2000
2018-01-02T21:35:45Z 7.7.7.7 8.8.8.8 300 3000

timestamp 经过 rollup 到分钟的粒度

1
2
3
4
5
6
timestamp                 srcIP         dstIP          packets     bytes
2018-01-01T01:01:00Z 1.1.1.1 2.2.2.2 600 6000
2018-01-01T01:02:00Z 1.1.1.1 2.2.2.2 900 9000
2018-01-01T01:03:00Z 1.1.1.1 2.2.2.2 600 6000
2018-01-02T21:33:00Z 7.7.7.7 8.8.8.8 300 3000
2018-01-02T21:35:00Z 7.7.7.7 8.8.8.8 300 3000

Druid

Druid 最早是美国广告技术公司 MetaMarkets 在 2011 年开发的的项目。
我们大概在两年前开始使用、很稳、 自从上线后除了 HDFS 空间不足之外基本没有挂过。
OLAP分析技术栈首选。在 Druid 里时间字段是必须存在的,rollup 是它的原生特性(只要你定义好时间的最小粒度)。

适用于数据量较大的业务,对于中小型项目这个方案还是有点重。而且只适合用来存分析型数据,当数据库来用有点不太合适

InfluxDB

InfluxDB 我没用过,本来想用用看的。可惜只有单机版、数据量大了显然没办法拓展。

Elasticsearch

接触时间最长,各种项目的万能膏药。

  • 时间聚合特性可以用来做时序分析,对原始数据没太大要求。
  • 各种类型的维度聚合,指标聚合很够用。
  • 读写性能很好,配置部署方便,支持分布式。
  • Kibana数据可视化,很好用、最近发现 6.6 又多了rollup 特性。

相对于Druid,这次数据量级不会特别大,毫无疑问当然选它。

数据写入

原始数据

1
2
3
4
5
6
7
{
"timestamp": 1516729294000,
"sell": 71001,
"price": 99,
"product": "高清水晶双光老花镜!低至1折!买一得七!",
"shop": "xxx"
}

Elasticsearch通过bulk API批量写入能达到很高的写入性能

创建Rollup Job

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
PUT _xpack/rollup/job/shop
{
"index_pattern": "shop-*",
"rollup_index": "shop_rollup",
"cron": "*/30 * * * * ?",
"page_size" :1000,
"groups" : {
"date_histogram": {
"field": "timestamp",
"interval": "1h",
"delay": "7d"
},
"terms": {
"fields": ["product", "shop"]
}
},
"metrics": [
{
"field": "sell",
"metrics": ["min", "max"]
},
{
"field": "price",
"metrics": ["min", "max"]
}
]
}

以上job定义了

  • index_pattern定义了数据源的匹配规则
  • rollup_index定义了输出目标index
  • cron定义了执行间隔每30分钟
  • 时间维度到小时这个粒度、处理7天之前的数据。
  • 两个terms类型的组合维度product,shop
  • metrics 指标:sell,price 聚合求最大/小值
1
POST _xpack/rollup/job/shop/_start

启动该rollup job

聚合查询rollup后的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
GET /shop/_rollup_search
{
"size": 0,
"aggregations": {
"timeline": {
"date_histogram": {
"field": "timestamp",
"interval": "7d"
},
"aggs": {
"products": {
"terms": {
"field": "product"
},
"aggs": {
"max_sell": {
"max": {
"field": "sell"
}
},
"max_price": {
"max": {
"field": "price"
}
}
}
}
}
}
}
}

以上查询约等于select max(sell) as max_sell, max(price) as max_price group by week, product

对实时数据,Rollup后的冷数据合并查询

如果原始数据在rollup之后会删掉,那么就会存在两个index 。实时数据index+rollup后的index。
那么查询rollup后的index时,怎么包含近期的数据呢?
只需额外转递raw data的index名称即可,_rollup_search接口会自行根据时序来合并结果一起返回。

1
2
3
4
5
6
7
8
9
10
11
GET shop-1,shop_rollup/_rollup_search 
{
"size": 0,
"aggregations": {
"max_sell": {
"max": {
"field": "sell"
}
}
}
}

Rollup聚合的局限性

  • 维度只支持Date Histogram ,Histogram ,Terms 类的聚合
  • 指标只支持Min、Max 、Sum 、Average 、Value Count的聚合

rollup目前还只是实验性的功能,未来可能会怎么样暂时还不知道~

参考来源: