很常见的分析需求:日志中记录有访客IP(国内),现在要根据IP地址得出访客的地理位置,精确到市县一级,数据量平均是每天15G,需要统计日、周、月的结果。
最后的实现方式是,先找到IP地理位置数据库,包含每个ip段对应的地址:1.1.0.0,1.1.0.255,福建省然后把它转成具体每个IP对应的地址,即:1.1.0.1,福建省。这样在Pig中用JOIN的方式,就可以获得这个IP实际对应的地址了。
IP Database
12年成立的中国广告协会互动网络分会IP地理信息标准委员会(简称IPB)已经发布了IP地理信息标准库,委员会成员和大广告公司都有一份。也可以下载全球城市ip库,取其中CN部分:
有效时间:永久
读取这个文件,把ip转成数字,起止ip的差,就是这个ip段所包含的主机数,用range函数生成,主要的python代码如下:
# coding: utf-8
def ip2long(ip_string):
return ip_string.find('.') != -1 and reduce(lambda a, b: a<<8 | b, map(int, ip_string.split("."))) or ip_string
if __name__ == '__main__':
OUTPUT = open(r'out.csv', 'a')
INPUT = open(r'cn.csv')
for l in INPUT.readlines():
s, e, city = map(ip2long, l.split(','))
if city.strip() == 'area': continue
for i in range(s, e+1):
OUTPUT.write('%s,%s'%(str(i), city))
OUTPUT.close()
INPUT.close()
处理结果会保存在out.csv文件中,中国地区总共有3亿个IP,生成的文件是7.9G,gz默认压缩后是900MB左右,按Map需求进行分割。
Pig Latin
LOAD保存在hdfs中的日志文件(*.log.gz)和IP数据库(ip*.tar.gz),將日志文件中的访客IP转成数字,用JOIN方法组合,再做GROUP和COUNT的操作,就可以得出该地区的访客数了:
ip_map_data = LOAD '/user/ip*.tar.gz' USING PigStorage(',') AS (iplong:long, code:int);
data = LOAD '/home/*.log.gz' USING PigStorage('"') AS (id: chararray, ip:chararray);
split_data = FOREACH data { ip = STRSPLIT(ip, '\\.', 4) ; generate id, (long)ip.$0 * 16777216 + (long)ip.$1 * 65536 + (long)ip.$2 * 256 + (long)ip.$3; };
join_data = JOIN ip_map_data by iplong, split_data by $1;
group_data = GROUP join_data BY $1 PARALLEL 6;
count = FOREACH group_data GENERATE group, COUNT(join_data);
DUMP count;
耗时44分33秒,统计出一周的结果:
(北京市,32174022)
(保定市,4244694)
(邯郸市,1062551)
Job Name: PigLatin:ip.pig
Job-ACLs: All users are allowed
Job Setup: Successful
Status: Succeeded
Finished in: 44mins, 33sec
Job Cleanup: Successful
(保定市,4244694)
(邯郸市,1062551)
Job Name: PigLatin:ip.pig
Job-ACLs: All users are allowed
Job Setup: Successful
Status: Succeeded
Finished in: 44mins, 33sec
Job Cleanup: Successful
Afterword
之前尝试做一个Pig的UDF,用{IP : CITY}形式的字典来做IP到城市的匹配,但JAVA函数大小有限制,而且用UDF会影响性能,所以没有采用。如果是自己写REDUCE来支持类似传统SQL中JOIN BETWEEN的方法,就不需要生成这个3亿多条记录的文件了。以上是在一个很小的测试Hadoop架构中进行的处理,理论上在实际生产环境可以处理得更快。