pg2neox小尝试

    xiaoxiao2025-11-26  24

    from py2neo import Graph,Node,Relationship import psycopg2 from commonConfig import POSTGRE_CONFIG,NEO4J_CONFIG class Neo4j1(Graph): def get_from_pg(self, pg_sql_client, pg_table, pg_schema = 'public', pg_columns_list = (), neo_properties_list=(), label = None, cover = True, limit = 0): """ :param pg_sql_client: pg 连接 :param pg_table:pg表名 :param pg_schema:pg的模式 :param pg_columns_list:选择的pg列 :param neo_properties_list:neo4j属性名,也是原pg表的列名 :param label:写入neo4j的表名/标签label :param cover: 覆盖写入neo4j :param limit: 写入neo4j的记录数 :return: """ # 如果不指定label,以pg_table的名字作为label if not label: label = pg_table # 是否覆盖写 if cover: delete_cypher = f'match (a:{label}) delete a' print(delete_cypher) self.run(delete_cypher) # 获得游标 cur = pg_sql_client.cursor() #获得表的列名 get_column_name = f"SELECT column_name FROM information_schema." \ f"COLUMNS WHERE table_schema = '{pg_schema}' AND TABLE_NAME='{pg_table}'" print(get_column_name) # 获得pg表数据 cur.execute(get_column_name) columns = cur.fetchall() pg_columns_tmp=pg_columns = None if not pg_columns_list: pg_columns_list = ['"'+i[0]+'"'for i in columns] if not neo_properties_list: neo_properties_list = [i[0] for i in columns] elif len(neo_properties_list) == len(pg_columns_list): neo_properties = neo_properties_list else: raise Exception('The length of neo4j_properties invalid!') else: pg_columns_tmp = ['"'+i+'"'for i in pg_columns_list] if not neo_properties_list: neo_properties = pg_columns_list elif len(neo_properties_list) == len(pg_columns_list): neo_properties = neo_properties_list else: raise Exception('The length of neo4j_properties_list and pg_columns_list not equal!') pg_columns = ",".join(pg_columns_tmp) if limit ==0: get_data = f'select {pg_columns} form {pg_table}' else: get_data = f'select {pg_columns} form {pg_table} limit{str(limit)}' cur.execute(get_data) data = cur.fetchall() # 开启一个新neo4j事务 tx = self.begin() for line in data: line = [str(word).replace("'","\'").replace("\\","\\\\") for word in line] node = Node(label, **dict(zip(neo_properties, line))) print(node) try: tx.create(node) except Exception as e: print(e) exit(-1) tx.commit() cur.close() """ input:x = [1, 2, 3] y = [4, 5, 6] z = [7, 8, 9] xyz = zip(x, y, z) output: print xyz 结果: [(1, 4, 7), (2, 5, 8), (3, 6, 9)] 原理:在运行zip(*xyz)之前,xyz的值是:[(1, 4, 7), (2, 5, 8), (3, 6, 9)] zip(*xyz) 等价于 zip((1, 4, 7), (2, 5, 8), (3, 6, 9)),行的元素拆分提取列元素 运行结果是:[(1, 2, 3), (4, 5, 6), (7, 8, 9)] """ if __name__ == '__main__': pg_conn = psycopg2.connect(**POSTGRE_CONFIG) print('pg连接') graph =Neo4j1(**NEO4J_CONFIG) print('neo4j1连接') graph.get_from_pg(pg_conn,'student',pg_schema='public',label='student')
    最新回复(0)