import csv import os import numpy as np import pandas as pd from lib.util import safe_get class PPCsv: def __init__(self, file=None, header=None, encoding='utf-8-sig'): self.file = file self.header = header self.encoding = encoding if self.file is None: self.df = pd.DataFrame(columns=header) self.df, _ = PPCsv.reduce_mem_usage(self.df) else: self.df = pd.read_csv(self.file) self.df, _ = PPCsv.reduce_mem_usage(self.df) # 写一列(在最后加一列) def add_one_column(self, align, field, data_map): """ align: 对齐字段 index_field: 扩列表头 """ insert_column = list() align_column = self.df[align] if not align: return # 数据对齐 for val in align_column: insert_column.append(data_map.get(val, '')) self.df.insert(len(self.df.columns), field, insert_column, allow_duplicates=True) # 写一行 def add_one_row(self, data): self.df = self.df.append(pd.Series(data, index=self.df.columns), ignore_index=True) # 写N列 def add_columns(self, insert_items): for item in insert_items: align = safe_get(item, 'align', str, '') field = safe_get(item, 'index_field', str, '') data = safe_get(item, 'data', dict, dict()) self.add_one_column(align, field, data) # 写N行 def add_rows(self, data_list): for data in data_list: self.add_one_row(data) # 全读 def read_all(self): return pd.read_csv(self.file).values.tolist() # 生成csv def gen_csv(self): self.df.to_csv(path_or_buf=self.file, encoding=self.encoding, header=1, index=0) # 文件扩列 def expand_file(self, file_name): ex_df = pd.read_csv(file_name) ex_columns = list(ex_df.columns) if len(ex_columns) < 2: return # 第一列是对齐数据 ex_align = ex_columns[0] ex_align_column = ex_df[ex_align] for field in ex_columns[1:]: ex_column = list(ex_df[field].fillna(0)) data_map = dict() for i in range(len(ex_align_column)): data_map[ex_align_column[i]] = ex_column[i] self.add_one_column(ex_align, field, data_map) # 减少dataframe占用内存 @staticmethod def reduce_mem_usage(props): start_mem_usg = props.memory_usage().sum() / 1024 ** 2 # print("Memory usage of properties dataframe is :", start_mem_usg, " MB") NAlist = [] # Keeps track of columns that have missing values filled in. for col in props.columns: if props[col].dtype != object: # Exclude strings # Print current column type # print("******************************") # print("Column: ", col) # print("dtype before: ", props[col].dtype) # make variables for Int, max and min IsInt = False mx = props[col].max() mn = props[col].min() # Integer does not support NA, therefore, NA needs to be filled if not np.isfinite(props[col]).all(): NAlist.append(col) props[col].fillna(mn - 1, inplace=True) # test if column can be converted to an integer asint = props[col].fillna(0).astype(np.int64) result = (props[col] - asint) result = result.sum() if result > -0.01 and result < 0.01: IsInt = True # Make Integer/unsigned Integer datatypes if IsInt: if mn >= 0: if mx < 255: props[col] = props[col].astype(np.uint8) elif mx < 65535: props[col] = props[col].astype(np.uint16) elif mx < 4294967295: props[col] = props[col].astype(np.uint32) else: props[col] = props[col].astype(np.uint64) else: if mn > np.iinfo(np.int8).min and mx < np.iinfo(np.int8).max: props[col] = props[col].astype(np.int8) elif mn > np.iinfo(np.int16).min and mx < np.iinfo(np.int16).max: props[col] = props[col].astype(np.int16) elif mn > np.iinfo(np.int32).min and mx < np.iinfo(np.int32).max: props[col] = props[col].astype(np.int32) elif mn > np.iinfo(np.int64).min and mx < np.iinfo(np.int64).max: props[col] = props[col].astype(np.int64) # Make float datatypes 32 bit else: props[col] = props[col].astype(np.float32) # Print new column type # print("dtype after: ", props[col].dtype) # print("******************************") # Print final result # print("___MEMORY USAGE AFTER COMPLETION:___") # mem_usg = props.memory_usage().sum() / 1024 ** 2 # print("Memory usage is: ", mem_usg, " MB") # print("This is ", 100 * mem_usg / start_mem_usg, "% of the initial size") return props, NAlist class Csv: def __init__(self, file='_default.csv', header=None, encoding='utf-8-sig'): """ 初始化文件名 :param file: 文件名 """ self.file = file self.header = header self.encoding = encoding if not os.path.exists(file) and header is not None: with open(file, 'a') as f: f_csv = csv.writer(f) f_csv.writerow(self.header) f.close() # data需要传二维数组 def append(self, data): with open(self.file, 'a', encoding=self.encoding) as f: f_csv = csv.writer(f) f_csv.writerows(data) f.close() def read(self, total, offset, limit, with_header=False): with open(self.file, 'r', encoding=self.encoding) as f: rows = csv.reader(f) if total - offset < limit: limit = total - offset if with_header: data = [row[0:] for row in rows][offset + 0:offset + 0 + limit] else: data = [row[0:] for row in rows][offset + 1:offset + 1 + limit] f.close() return data def read_all(self, with_header=False): with open(self.file, 'r', encoding=self.encoding) as f: if with_header: rows = list(csv.reader(f)) else: rows = list(csv.reader(f))[1:] f.close() return rows # data_list只需要传一维数组 def extend(self, data_list): if not isinstance(data_list, list): return data_write = [] for data in data_list: data_write.append([data]) with open(self.file, 'a', encoding=self.encoding) as f: f_csv = csv.writer(f) f_csv.writerows(data_write) f.close() def get_headers(self): headers = list() with open(self.file, 'r', encoding=self.encoding) as f: rows = list(csv.reader(f))[:1] if len(rows) > 0: headers = rows[0] f.close() return headers