记录一次读取1600万 行 X 41列的csv文件。
目标 /环境
文件数据为测试模拟数据,数据大小 9.8G,目标为读取数据,并且写入mysql。 目标文件
运行环境:
python3.6 (64位),pandas,pymysql。
思路
采用pandas内置read_csv方法,分块读取文件,开4个进程写入mysql。
遇到的坑有,memoryerror,这里需要强调!!! 必须把写入过的数据del掉,然后调用gc.collect() 释放内存,否则,一直会出现memoryerror,因为python的垃圾回收机制必须试函数执行完才释放,按照这个思路,不等数据读取完,内存就炸了。
下面上代码。
代码
import pandas
as pd
from sqlalchemy
import create_engine
from multiprocessing
import Pool
import gc
def gets():
"""分块读取文件,其实1,000,000行据说速度最佳。"""
df1
= pd
.read_csv
(r
'C:\\Users\\zq\\Desktop\\Project\\demo_python\\测试2.csv', low_memory
=False, chunksize
=500000)
return df1
def to_sqls(df
, table
):
"""写入mysql"""
engine
= create_engine
('mysql+pymysql://root:root123@127.0.0.1:3306/demo',
max_overflow
=0, pool_size
=5,
pool_timeout
=30, pool_recycle
=-1)
con
= engine
.connect
()
df
.to_sql
(table
, con
=con
, if_exists
='append')
del df
gc
.collect
()
def get_times():
p
= Pool
(4)
df
= gets
()
for df1
in df
:
print(df1
.shape
[0])
for i
in range(0, df1
.shape
[0] + 1, int((df1
.shape
[0] + 1) / 4)):
if (i
+ int((df1
.shape
[0] + 1) / 4)) < (df1
.shape
[0] + 1):
p
.apply_async
(to_sqls
, args
=(df1
.iloc
[i
:i
+ int((df1
.shape
[0] + 1) / 4), :], 'ce'))
else:
p
.apply_async
(to_sqls
, args
=(df1
.iloc
[i
:df1
.shape
[0] + 1, :], 'ce'))
p
.close
()
p
.join
()
if __name__
== '__main__':
import time
t1
= time
.time
()
get_times
()
t2
= time
.time
() - t1
print('%s s' % t2
)
结果。
实际测试时间为:1161.214187860489 s完成整个程序。(包含读取 和写入) 数据库结果如下。 本次记录到此。如果后期发现更快的方法,将继续分享。