136 lines
4.3 KiB
Python
136 lines
4.3 KiB
Python
from lib.all import *
|
|
from pymongo import UpdateOne
|
|
|
|
|
|
class S:
|
|
def __init__(self):
|
|
self.mysql_db_vas = Mysql(
|
|
"rm-bp11t1616a1kjvmx5.mysql.rds.aliyuncs.com", 3306, "vas", "root", "Wishpal2024"
|
|
)
|
|
self.col_user_income = MongoDB(
|
|
host="mongodb://root:Wishpal2024@dds-bp1da1ddd62bede41.mongodb.rds.aliyuncs.com:3717,dds-bp1da1ddd62bede42.mongodb.rds.aliyuncs.com:3717/admin?replicaSet=mgset-77304659",
|
|
port=3717,
|
|
db="vas",
|
|
collection="user_income"
|
|
)
|
|
|
|
def __del__(self):
|
|
self.mysql_db_vas.close()
|
|
self.col_user_income.close()
|
|
|
|
# 获取每日收益
|
|
def get_week_dashboard(self):
|
|
et = get_today_zero_time()
|
|
st = et - 7 * 86400
|
|
sql = '''
|
|
select t1.mid,t1.ymd,sum(t1.`change`) income
|
|
from
|
|
(
|
|
select mid,`change`,DATE_FORMAT(FROM_UNIXTIME(ct), '%Y%m%d') as ymd from vas_ch_income where ct>={} and ct<{} and mid>0
|
|
) t1
|
|
group by t1.mid,t1.ymd
|
|
'''.format(st, et)
|
|
docs = self.mysql_db_vas.query(sql)
|
|
print("week, len(docs): {}".format(len(docs)))
|
|
|
|
mid_docs_map = dict()
|
|
for doc in docs:
|
|
mid = safe_get_int(doc, "mid")
|
|
if mid not in mid_docs_map.keys():
|
|
mid_docs_map[mid] = list()
|
|
mid_docs_map[mid].append(doc)
|
|
print("week, len(mid_docs_map): {}".format(len(mid_docs_map)))
|
|
|
|
ymds = list()
|
|
while True:
|
|
if st >= et:
|
|
break
|
|
ymds.append(get_time_str_by_ts_v2(st, "%Y%m%d"))
|
|
st += 86400
|
|
|
|
reqs = list()
|
|
for mid, docs in mid_docs_map.items():
|
|
q = {"_id": mid}
|
|
ymd_doc_map = dict()
|
|
for d in docs:
|
|
ymd = safe_get_str(d, "ymd")
|
|
ymd_doc_map[ymd] = d
|
|
|
|
lis = list()
|
|
for ymd in ymds:
|
|
d = safe_get_dict(ymd_doc_map, ymd)
|
|
lis.append({
|
|
"date": ymd,
|
|
"income": safe_get_int(d, "income")
|
|
})
|
|
up = {
|
|
"$set": {
|
|
"ut": int(time.time()),
|
|
"week_dashboard": lis
|
|
}
|
|
}
|
|
reqs.append(UpdateOne(q, up, upsert=True))
|
|
ret = self.col_user_income.bulk_write(reqs)
|
|
print("week, bulk_ret, match: {}, modify: {}, insert: {}, upsert: {}".format(ret.matched_count, ret.modified_count, ret.inserted_count, ret.upserted_count))
|
|
|
|
# 获取收益来源
|
|
def get_income_from_dashboard(self):
|
|
et = get_today_zero_time()
|
|
st = et - 7 * 86400
|
|
sql = '''
|
|
select t1.mid,
|
|
t1.type_id,
|
|
sum(t1.`change`) income
|
|
from
|
|
(
|
|
select mid,
|
|
case
|
|
when type_id = 'h5_zone_admission' then '空间解锁'
|
|
when type_id = 'h5_zone_moment' then '付费帖'
|
|
when type_id = 'contact_wechat' then '个人微信'
|
|
when type_id = 'h5_zone_superfanship' then '超粉解锁'
|
|
ELSE '其他'
|
|
END AS type_id,
|
|
`change`
|
|
from vas_ch_income where ct>={} and ct<{} and mid>0
|
|
) t1
|
|
group by t1.mid,t1.type_id
|
|
'''.format(st, et)
|
|
docs = self.mysql_db_vas.query(sql)
|
|
print("from, len(docs): {}".format(len(docs)))
|
|
|
|
mid_docs_map = dict()
|
|
for doc in docs:
|
|
mid = safe_get_int(doc, "mid")
|
|
if mid not in mid_docs_map.keys():
|
|
mid_docs_map[mid] = list()
|
|
mid_docs_map[mid].append(doc)
|
|
print("from, len(mid_docs_map): {}".format(len(mid_docs_map)))
|
|
|
|
reqs = list()
|
|
for mid, docs in mid_docs_map.items():
|
|
q = {"_id": mid}
|
|
lis = list()
|
|
for d in docs:
|
|
lis.append({
|
|
"desc": safe_get_str(d, "type_id"),
|
|
"income": safe_get_int(d, "income")
|
|
})
|
|
up = {
|
|
"$set": {
|
|
"ut": int(time.time()),
|
|
"week_from_dashboard": lis
|
|
}
|
|
}
|
|
reqs.append(UpdateOne(q, up, upsert=True))
|
|
ret = self.col_user_income.bulk_write(reqs)
|
|
print("from, bulk_ret, match: {}, modify: {}, insert: {}, upsert: {}".format(ret.matched_count, ret.modified_count, ret.inserted_count, ret.upserted_count))
|
|
|
|
def proc(self):
|
|
self.get_week_dashboard()
|
|
self.get_income_from_dashboard()
|
|
|
|
|
|
s = S()
|
|
s.proc()
|