scripts/lib/csv.py

214 lines
7.5 KiB
Python

import csv
import os
import numpy as np
import pandas as pd
from lib.util import safe_get
class PPCsv:
def __init__(self, file=None, header=None, encoding='utf-8-sig'):
self.file = file
self.header = header
self.encoding = encoding
if self.file is None:
self.df = pd.DataFrame(columns=header)
self.df, _ = PPCsv.reduce_mem_usage(self.df)
else:
self.df = pd.read_csv(self.file)
self.df, _ = PPCsv.reduce_mem_usage(self.df)
# 写一列(在最后加一列)
def add_one_column(self, align, field, data_map):
"""
align: 对齐字段
index_field: 扩列表头
"""
insert_column = list()
align_column = self.df[align]
if not align:
return
# 数据对齐
for val in align_column:
insert_column.append(data_map.get(val, ''))
self.df.insert(len(self.df.columns), field, insert_column, allow_duplicates=True)
# 写一行
def add_one_row(self, data):
self.df = self.df.append(pd.Series(data, index=self.df.columns), ignore_index=True)
# 写N列
def add_columns(self, insert_items):
for item in insert_items:
align = safe_get(item, 'align', str, '')
field = safe_get(item, 'index_field', str, '')
data = safe_get(item, 'data', dict, dict())
self.add_one_column(align, field, data)
# 写N行
def add_rows(self, data_list):
for data in data_list:
self.add_one_row(data)
# 全读
def read_all(self):
return pd.read_csv(self.file).values.tolist()
# 生成csv
def gen_csv(self):
self.df.to_csv(path_or_buf=self.file, encoding=self.encoding, header=1, index=0)
# 文件扩列
def expand_file(self, file_name):
ex_df = pd.read_csv(file_name)
ex_columns = list(ex_df.columns)
if len(ex_columns) < 2:
return
# 第一列是对齐数据
ex_align = ex_columns[0]
ex_align_column = ex_df[ex_align]
for field in ex_columns[1:]:
ex_column = list(ex_df[field].fillna(0))
data_map = dict()
for i in range(len(ex_align_column)):
data_map[ex_align_column[i]] = ex_column[i]
self.add_one_column(ex_align, field, data_map)
# 减少dataframe占用内存
@staticmethod
def reduce_mem_usage(props):
start_mem_usg = props.memory_usage().sum() / 1024 ** 2
# print("Memory usage of properties dataframe is :", start_mem_usg, " MB")
NAlist = [] # Keeps track of columns that have missing values filled in.
for col in props.columns:
if props[col].dtype != object: # Exclude strings
# Print current column type
# print("******************************")
# print("Column: ", col)
# print("dtype before: ", props[col].dtype)
# make variables for Int, max and min
IsInt = False
mx = props[col].max()
mn = props[col].min()
# Integer does not support NA, therefore, NA needs to be filled
if not np.isfinite(props[col]).all():
NAlist.append(col)
props[col].fillna(mn - 1, inplace=True)
# test if column can be converted to an integer
asint = props[col].fillna(0).astype(np.int64)
result = (props[col] - asint)
result = result.sum()
if result > -0.01 and result < 0.01:
IsInt = True
# Make Integer/unsigned Integer datatypes
if IsInt:
if mn >= 0:
if mx < 255:
props[col] = props[col].astype(np.uint8)
elif mx < 65535:
props[col] = props[col].astype(np.uint16)
elif mx < 4294967295:
props[col] = props[col].astype(np.uint32)
else:
props[col] = props[col].astype(np.uint64)
else:
if mn > np.iinfo(np.int8).min and mx < np.iinfo(np.int8).max:
props[col] = props[col].astype(np.int8)
elif mn > np.iinfo(np.int16).min and mx < np.iinfo(np.int16).max:
props[col] = props[col].astype(np.int16)
elif mn > np.iinfo(np.int32).min and mx < np.iinfo(np.int32).max:
props[col] = props[col].astype(np.int32)
elif mn > np.iinfo(np.int64).min and mx < np.iinfo(np.int64).max:
props[col] = props[col].astype(np.int64)
# Make float datatypes 32 bit
else:
props[col] = props[col].astype(np.float32)
# Print new column type
# print("dtype after: ", props[col].dtype)
# print("******************************")
# Print final result
# print("___MEMORY USAGE AFTER COMPLETION:___")
# mem_usg = props.memory_usage().sum() / 1024 ** 2
# print("Memory usage is: ", mem_usg, " MB")
# print("This is ", 100 * mem_usg / start_mem_usg, "% of the initial size")
return props, NAlist
class Csv:
def __init__(self, file='_default.csv', header=None, encoding='utf-8-sig'):
""" 初始化文件名
:param file: 文件名
"""
self.file = file
self.header = header
self.encoding = encoding
if not os.path.exists(file) and header is not None:
with open(file, 'a') as f:
f_csv = csv.writer(f)
f_csv.writerow(self.header)
f.close()
# data需要传二维数组
def append(self, data):
with open(self.file, 'a', encoding=self.encoding) as f:
f_csv = csv.writer(f)
f_csv.writerows(data)
f.close()
def read(self, total, offset, limit, with_header=False):
with open(self.file, 'r', encoding=self.encoding) as f:
rows = csv.reader(f)
if total - offset < limit:
limit = total - offset
if with_header:
data = [row[0:] for row in rows][offset + 0:offset + 0 + limit]
else:
data = [row[0:] for row in rows][offset + 1:offset + 1 + limit]
f.close()
return data
def read_all(self, with_header=False):
with open(self.file, 'r', encoding=self.encoding) as f:
if with_header:
rows = list(csv.reader(f))
else:
rows = list(csv.reader(f))[1:]
f.close()
return rows
# data_list只需要传一维数组
def extend(self, data_list):
if not isinstance(data_list, list):
return
data_write = []
for data in data_list:
data_write.append([data])
with open(self.file, 'a', encoding=self.encoding) as f:
f_csv = csv.writer(f)
f_csv.writerows(data_write)
f.close()
def get_headers(self):
headers = list()
with open(self.file, 'r', encoding=self.encoding) as f:
rows = list(csv.reader(f))[:1]
if len(rows) > 0:
headers = rows[0]
f.close()
return headers