Python作为一种编程语言,在易读、易维护方面有其独特的优点,越来越多的人使用Python进行数据分析和处理。 Pandas是为了解决数据分析任务而设计的,它包含大量可以轻松处理数据的函数和方法,是使Python成为一个强大高效的数据分析环境的重要因素之一。
但是,Pandas是内存的类库,处理小数据(放入内存)没有问题,对于大数据(放入内存)来说不是很方便。 在平时的工作中,经常会遇到这样大的文件(从数据库或网站下载的数据),但Pandas无能为力,我们只能自己想办法。 我们将讨论这个问题。
本文所指的大数据,并不是需要TB、PB级分布式处理的大数据,而是指普通的PC机无法装入内存,但硬盘内可以存在GB级文件数据。
由于这些文件不能一次读取到内存中,因此在处理数据时,通常需要采用逐行或逐块读取的方法进行处理。 虽然Python和pandas在读取文件时支持这种方法,但由于没有游标系统,因此必须拆分使用某些函数和方法,或者函数和方法本身必须自己编写代码。 这里介绍了最常见的几个问题,并编写了代码示例供读者参考。
一.聚合
在单纯集约中,只要遍历一次数据,根据集约目标计算一次集约列即可。 例如,求和(sum )包括在数据遍历时读取的数据; 在计数(count )、遍历数据情况下,只要记录遍历数即可; 平均(mean ),扫描时同时记录累计和扫描数,最后除以即可。 这里以问题为例进行介绍。
这些文件包括: 数据段如下所示。
现在需要计算销售额总额。 (amount列) )。
(一)逐行读取
总数=0
withopen(Orders.txt,) r ) ) as f:
line=f.readline () )
while True:
line=f.readline () )
if not line:
布莱克
total=float(line.split ) (t ) ) )
是打印(总)
打开文件
标题行
逐行读取
内容还没看懂就结束
累积加法
(二) pandas区块引线
使用pandas可以以块为单位进行读取。 工作逻辑的结构如下图所示。
import pandas as pd
chunk _ data=PD.read _ CSV (orders.txt )、sep='t '、chunksize=100000 ) )
总数=0
for chunk in chunk_data:
total=chunk['amount'].sum (
是打印(总)
逐步读取文件,每段10万行
累计各段的销售额
pandas擅长用更长的段进行计算,理论上chunksize越大计算速度越快,但请注意内存限制。 不建议使用pandas逐行读取文件,因为将“chunksize”设置为1会导致逐行读取,速度非常慢。
二.过滤
过滤流程图:
过滤与聚合相同,将大文件分成n段,对各段进行过滤,最后合并各段的结果即可。
以上数据为例,继续过滤纽约州的销售信息
(一)小结果集
import pandas as pd
chunk _ data=PD.read _ CSV (orders.txt )、sep='t '、chunksize=100000 ) )
chunk_list=[]
for chunk in chunk_data:
chunk _ list.append (chunk [ chunk.state==' new York ' )
RES=PD.concat(chunk_list ) )。
是打印(RES )
定义空列表保存结果
分段过滤
合并结果
(二)大结果集
import pandas as pd
chunk _ data=PD.read _ CSV (orders.txt )、sep='t '、chunksize=100000 ) )
n=0
for chunk in chunk_data:
need _ data=chunk [ chunk.state==' new York ' ]
if n==0:
need _ data.to _ CSV (orders _ filter.txt ) (index=None ) ) ) ) ) ) ) ) ) ) ) )。
n=1
else:
n
eed_data.to_csv("orders_filter.txt",index=None,mode='a',header=None)第一段,写入文件,保留表头,不保留索引
其他段,追加写入不保留表头和索引
大文件聚合和过滤运算的逻辑相对简单,但因为 Python 没有直接提供游标数据类型,代码也要写很多行。
三、 排序
排序流程图:
排序要麻烦得多,如上图所示:
1. 分段读取数据;
2. 对每一段进行排序;
3. 将每一段的排序结果写出至临时文件;
4. 维护一个 k 个元素的列表(k 等于分段数),每个临时文件将一行数据放入该列表;
5. 将列表中的记录的按排序的字段的排序 (与第二步的排序方式相同,升序都升序,降序都降序);
6. 将列表的最小或最大记录写出至结果文件 (升序时最小,降序时最大);
7. 从写出记录的临时文件中再读取一行放入列表;
8. 重复 6.7 步,直至所有记录写出至结果文件。
继续以上面数据为例,用 Python 写一段完整的外存排序算法,将文件中的数据按订单金额升序排序
import pandas as pd
import os
import time
import shutil
import uuid
import traceback
def parse_type(s):
if s.isdigit():
return int(s)
try:
res = float(s)
return res
except:
return s
def pos_by(by,head,sep):
by_num = 0
for col in head.split(sep):
if col.strip()==by:
break
else:
by_num+=1
return by_num
def merge_sort(directory,ofile,by,ascending=True,sep=","):
with open(ofile,'w') as outfile:
file_list = os.listdir(directory)
file_chunk = [open(directory+"/"+file,'r') for file in file_list]
k_row = [file_chunk[i].readline()for i in range(len(file_chunk))]
by = pos_by(by,k_row[0],sep)
outfile.write(k_row[0])
k_row = [file_chunk[i].readline()for i in range(len(file_chunk))]
k_by = [parse_type(k_row[i].split(sep)[by].strip())for i in range(len(file_chunk))]
with open(ofile,'a') as outfile:
while True:
for i in range(len(k_by)):
if i >= len(k_by):
break
sorted_k_by = sorted(k_by) if ascending else sorted(k_by,reverse=True)
if k_by[i] == sorted_k_by[0]:
outfile.write(k_row[i])
k_row[i] = file_chunk[i].readline()
if not k_row[i]:
file_chunk[i].close()
del(file_chunk[i])
del(k_row[i])
del(k_by[i])
else:
k_by[i] = parse_type(k_row[i].split(sep)[by].strip())
if len(k_by)==0:
break
def external_sort(file_path,by,ofile,tmp_dir,ascending=True,chunksize=50000,sep=',',usecols=None,index_col=None):
os.makedirs(tmp_dir,exist_ok=True)
try:
data_chunk = pd.read_csv(file_path,sep=sep,usecols=usecols,index_col=index_col,chunksize=chunksize)
for chunk in data_chunk:
chunk = chunk.sort_values(by,ascending=ascending)
chunk.to_csv(tmp_dir+"/"+"chunk"+str(int(time.time()*10**7))+str(uuid.uuid4())+".csv",index=None,sep=sep)
merge_sort(tmp_dir,ofile=ofile,by=by,ascending=ascending,sep=sep)
except Exception:
print(traceback.format_exc())
finally:
shutil.rmtree(tmp_dir, ignore_errors=True)
if __name__ == "__main__":
infile = "D:/python_question_data/orders.txt"
ofile = "D:/python_question_data/extra_sort_res_py.txt"
tmp = "D:/python_question_data/tmp"
external_sort(infile,'amount',ofile,tmp,ascending=True,chunksize=1000000,sep='t')
函数
解析字符串的数据类型
函数
计算要排序的列名在表头中的位置
函数
外存归并排序
列出临时文件
打开临时文件
读取表头
计算要排序的列在表头的位置
写出表头
读取正文第一行
维护一个 k 个元素的列表,存放 k 个排序列值
排序,维护的列表升序正向,降序反向
写出最小值对应的行
读完一个文件处理一个
如果文件没读完
更新维护的列表循环计算
所有文件读完结束
函数
外存排序
创建临时文件目录
分段读取需排序的文件
分段排序
写出排好序的文件
外存归并排序
删除临时目录
主程序
调用外存排序函数
这里是用逐行归并写出的方式完成外存排序的,由于 pandas 逐行读取的方式效率非常低,所以没有借助 pandas 完成逐行归并排序。读者感兴趣的话可以尝试使用 pandas 按块归并,比较下两者的效率。
相比于聚合和过滤,这个代码相当复杂了,对于很多非专业程序员来讲已经是不太可能实现的任务了,而且它的运算效率也不高。
以上代码也仅处理了规范的结构化文件和单列排序。如果文件结构不规范比如不带表头、各行的分隔符数量不同、排序列是不规范的日期格式或者按照多列排序等等情况,代码还会进一步复杂化。
四、 分组
大文件的分组汇总也很麻烦,一个容易想到的办法是先将文件按分组列排序,然后再遍历有序文件,如果分组列值和前一行相同则汇总在同一组内,和前一行不同则新建一组继续汇总。如果结果集过大,还要看情况把计算好的分组结果及时写出。
这个算法相对简单,但性能很差,需要经过大排序的过程。一般数据库会使用 Hash 分组的方案,能够有效地提高速度,但代码复杂度要高出几倍。普通非专业人员基本上没有可能写出来了。这里也就不再列出代码了。
通过以上介绍,我们知道,Python 处理大文件还是非常费劲的,这主要是因为它没有提供为大数据服务的游标类型及相关运算,只能自己写代码,不仅繁琐而且运算效率低。
Python不方便,那么还有什么工具适合非专业程序员来处理大文件呢?
esProc SPL在这方面要要比 Python 方便得多,SPL 是专业的结构化数据处理语言,提供了比 pandas 更丰富的运算,内置有游标数据类型,解决大文件的运算就非常简单。比如上面这些例子都可以很容易完成。
一、 聚合
二、 过滤
三、 排序
四、 分组
特别指出,SPL 的分组汇总就是采用前面说过的数据库中常用的 HASH 算法,效率很高。
SPL中还内置了并行计算,现在多核 CPU 很常见,使用并行计算可以大幅度提高性能,比如分组汇总,只多加一个 @m 就可以变成并行计算。
而 Python 写并行计算的程序就太困难了,网上说啥的都有,就是找不到一个简单的办法。