from lib.all import * from pymongo import UpdateOne class S: def __init__(self): self.mysql_db_vas = Mysql( "rm-bp11t1616a1kjvmx5.mysql.rds.aliyuncs.com", 3306, "vas", "root", "Wishpal2024" ) self.col_user_income = MongoDB( host="mongodb://root:Wishpal2024@dds-bp1da1ddd62bede41.mongodb.rds.aliyuncs.com:3717,dds-bp1da1ddd62bede42.mongodb.rds.aliyuncs.com:3717/admin?replicaSet=mgset-77304659", port=3717, db="vas", collection="user_income" ) def __del__(self): self.mysql_db_vas.close() self.col_user_income.close() # 获取每日收益 def get_week_dashboard(self): et = get_today_zero_time() st = et - 7 * 86400 sql = ''' select t1.mid,t1.ymd,sum(t1.`change`) income from ( select mid,`change`,DATE_FORMAT(FROM_UNIXTIME(ct), '%Y%m%d') as ymd from vas_ch_income where ct>={} and ct<{} and mid>0 ) t1 group by t1.mid,t1.ymd '''.format(st, et) docs = self.mysql_db_vas.query(sql) print("week, len(docs): {}".format(len(docs))) mid_docs_map = dict() for doc in docs: mid = safe_get_int(doc, "mid") if mid not in mid_docs_map.keys(): mid_docs_map[mid] = list() mid_docs_map[mid].append(doc) print("week, len(mid_docs_map): {}".format(len(mid_docs_map))) ymds = list() while True: if st >= et: break ymds.append(get_time_str_by_ts_v2(st, "%Y%m%d")) st += 86400 reqs = list() for mid, docs in mid_docs_map.items(): q = {"_id": mid} ymd_doc_map = dict() for d in docs: ymd = safe_get_str(d, "ymd") ymd_doc_map[ymd] = d lis = list() for ymd in ymds: d = safe_get_dict(ymd_doc_map, ymd) lis.append({ "date": ymd, "income": safe_get_int(d, "income") }) up = { "$set": { "ut": int(time.time()), "week_dashboard": lis } } reqs.append(UpdateOne(q, up, upsert=True)) ret = self.col_user_income.bulk_write(reqs) print("week, bulk_ret, match: {}, modify: {}, insert: {}, upsert: {}".format(ret.matched_count, ret.modified_count, ret.inserted_count, ret.upserted_count)) # 获取收益来源 def get_income_from_dashboard(self): et = get_today_zero_time() st = et - 7 * 86400 sql = ''' select t1.mid, t1.type_id, sum(t1.`change`) income from ( select mid, case when type_id = 'h5_zone_admission' then '空间解锁' when type_id = 'h5_zone_moment' then '付费帖' when type_id = 'contact_wechat' then '个人微信' when type_id = 'h5_zone_superfanship' then '超粉解锁' ELSE '其他' END AS type_id, `change` from vas_ch_income where ct>={} and ct<{} and mid>0 ) t1 group by t1.mid,t1.type_id '''.format(st, et) docs = self.mysql_db_vas.query(sql) print("from, len(docs): {}".format(len(docs))) mid_docs_map = dict() for doc in docs: mid = safe_get_int(doc, "mid") if mid not in mid_docs_map.keys(): mid_docs_map[mid] = list() mid_docs_map[mid].append(doc) print("from, len(mid_docs_map): {}".format(len(mid_docs_map))) reqs = list() for mid, docs in mid_docs_map.items(): q = {"_id": mid} lis = list() for d in docs: lis.append({ "desc": safe_get_str(d, "type_id"), "income": safe_get_int(d, "income") }) up = { "$set": { "ut": int(time.time()), "week_from_dashboard": lis } } reqs.append(UpdateOne(q, up, upsert=True)) ret = self.col_user_income.bulk_write(reqs) print("from, bulk_ret, match: {}, modify: {}, insert: {}, upsert: {}".format(ret.matched_count, ret.modified_count, ret.inserted_count, ret.upserted_count)) def proc(self): self.get_week_dashboard() self.get_income_from_dashboard() s = S() s.proc()