记录一次读取 9.8G的一个csv文件,并且写入mysql。

    xiaoxiao2025-06-03  82

    记录一次读取1600万 行 X 41列的csv文件。

    目标 /环境

    文件数据为测试模拟数据,数据大小 9.8G,目标为读取数据,并且写入mysql。 目标文件

    运行环境:

    python3.6 (64位),pandas,pymysql。

    思路

    采用pandas内置read_csv方法,分块读取文件,开4个进程写入mysql。

    遇到的坑有,memoryerror,这里需要强调!!! 必须把写入过的数据del掉,然后调用gc.collect() 释放内存,否则,一直会出现memoryerror,因为python的垃圾回收机制必须试函数执行完才释放,按照这个思路,不等数据读取完,内存就炸了。

    下面上代码。

    代码

    import pandas as pd from sqlalchemy import create_engine from multiprocessing import Pool import gc def gets(): """分块读取文件,其实1,000,000行据说速度最佳。""" df1 = pd.read_csv(r'C:\\Users\\zq\\Desktop\\Project\\demo_python\\测试2.csv', low_memory=False, chunksize=500000) return df1 def to_sqls(df, table): """写入mysql""" engine = create_engine('mysql+pymysql://root:root123@127.0.0.1:3306/demo', max_overflow=0, pool_size=5, pool_timeout=30, pool_recycle=-1) con = engine.connect() df.to_sql(table, con=con, if_exists='append') del df#删除掉变量,释放内存。 gc.collect() def get_times(): p = Pool(4) df = gets() for df1 in df: print(df1.shape[0]) for i in range(0, df1.shape[0] + 1, int((df1.shape[0] + 1) / 4)): if (i + int((df1.shape[0] + 1) / 4)) < (df1.shape[0] + 1): p.apply_async(to_sqls, args=(df1.iloc[i:i + int((df1.shape[0] + 1) / 4), :], 'ce')) else:#超出文件范围 p.apply_async(to_sqls, args=(df1.iloc[i:df1.shape[0] + 1, :], 'ce')) p.close() p.join() if __name__ == '__main__': import time t1 = time.time() get_times() t2 = time.time() - t1 print('%s s' % t2)

    结果。

    实际测试时间为:1161.214187860489 s完成整个程序。(包含读取 和写入) 数据库结果如下。 本次记录到此。如果后期发现更快的方法,将继续分享。

    最新回复(0)