scripts/cronjob/vas/user_income_calc.py

141 lines
4.4 KiB
Python

from lib.all import *
from pymongo import UpdateOne
class S:
def __init__(self):
self.mysql_db_vas = Mysql(
"rm-bp11t1616a1kjvmx5.mysql.rds.aliyuncs.com", 3306, "vas", "root", "Wishpal2024"
)
self.col_user_income = MongoDB(
host="mongodb://root:Wishpal2024@dds-bp1da1ddd62bede41.mongodb.rds.aliyuncs.com:3717,dds-bp1da1ddd62bede42.mongodb.rds.aliyuncs.com:3717/admin?replicaSet=mgset-77304659",
port=3717,
db="vas",
collection="user_income"
)
def __del__(self):
self.mysql_db_vas.close()
self.col_user_income.close()
# 获取每日收益
def get_week_dashboard(self):
et = get_today_zero_time()
st = et - 7 * 86400
sql = '''
select t1.mid,t1.ymd,sum(t1.`change`) income
from
(
select mid,`change`,DATE_FORMAT(FROM_UNIXTIME(ct), '%Y%m%d') as ymd from vas_ch_income where ct>={} and ct<{} and mid>0
) t1
group by t1.mid,t1.ymd
'''.format(st, et)
docs = self.mysql_db_vas.query(sql)
print("week, len(docs): {}".format(len(docs)))
mid_docs_map = dict()
for doc in docs:
mid = safe_get_int(doc, "mid")
if mid not in mid_docs_map.keys():
mid_docs_map[mid] = list()
mid_docs_map[mid].append(doc)
print("week, len(mid_docs_map): {}".format(len(mid_docs_map)))
ymds = list()
while True:
if st >= et:
break
ymds.append(get_time_str_by_ts_v2(st, "%Y%m%d"))
st += 86400
reqs = list()
for mid, docs in mid_docs_map.items():
q = {"_id": mid}
ymd_doc_map = dict()
for d in docs:
ymd = safe_get_str(d, "ymd")
ymd_doc_map[ymd] = d
lis = list()
for ymd in ymds:
d = safe_get_dict(ymd_doc_map, ymd)
lis.append({
"date": safe_get_str(d, "ymd"),
"income": safe_get_int(d, "income")
})
up = {
"$set": {
"ut": int(time.time()),
"week_dashboard": lis
}
}
reqs.append(UpdateOne(q, up, upsert=True))
ret = self.col_user_income.bulk_write(reqs)
print("week, bulk_ret, match: {}, modify: {}, insert: {}, upsert: {}".format(ret.matched_count, ret.modified_count, ret.inserted_count, ret.upserted_count))
# 获取收益来源
def get_income_from_dashboard(self):
et = get_today_zero_time()
st = et - 7 * 86400
sql = '''
select t1.mid,
t1.type_id,
sum(t1.`change`) income
from
(
select mid,
case
when type_id = 'h5_zone_admission' then '空间解锁'
when type_id = 'h5_zone_moment' then '付费帖'
when type_id = 'contact_wechat' then '个人微信'
when type_id = 'h5_zone_superfanship' then '超粉解锁'
ELSE '其他'
END AS type_id,
`change`
from vas_ch_income where ct>={} and ct<{} and mid>0
) t1
group by t1.mid,t1.type_id
'''.format(st, et)
docs = self.mysql_db_vas.query(sql)
print("from, len(docs): {}".format(len(docs)))
mid_docs_map = dict()
for doc in docs:
mid = safe_get_int(doc, "mid")
if mid not in mid_docs_map.keys():
mid_docs_map[mid] = list()
mid_docs_map[mid].append(doc)
print("from, len(mid_docs_map): {}".format(len(mid_docs_map)))
reqs = list()
for mid, docs in mid_docs_map.items():
q = {"_id": mid}
lis = list()
for d in docs:
lis.append({
"desc": safe_get_str(d, "type_id"),
"income": safe_get_int(d, "income")
})
up = {
"$set": {
"ut": int(time.time()),
"week_from_dashboard": lis
}
}
reqs.append(UpdateOne(q, up, upsert=True))
ret = self.col_user_income.bulk_write(reqs)
print("from, bulk_ret, match: {}, modify: {}, insert: {}, upsert: {}".format(ret.matched_count, ret.modified_count, ret.inserted_count, ret.upserted_count))
def proc(self):
self.get_week_dashboard()
self.get_income_from_dashboard()
# s = S()
# s.proc()
et = get_today_zero_time()
st = et - 7 * 86400
print(ymds)