from lib.all import * import datetime # 主播当日收入明细 # 示例:获取当前月和下一个月的第一天0点时间戳 def get_first_day_of_current_and_next_month_timestamps(date=None): if date is None: date = datetime.datetime.now() # 获取当前月份的第一天 first_day_of_current_month = datetime.datetime(date.year, date.month, 1) # 获取下个月的第一天 # 首先计算下个月的第一天 next_month = date.month % 12 + 1 if date.month == 12 else date.month + 1 next_year = date.year if date.month != 12 else date.year + 1 first_day_of_next_month = datetime.datetime(next_year, next_month, 1) # 将两个日期转换为时间戳 timestamp_current_month = first_day_of_current_month.timestamp() timestamp_next_month = first_day_of_next_month.timestamp() return int(timestamp_current_month), int(timestamp_next_month) class S: def __init__(self): self.mysql_db_vas = Mysql( "rm-bp11t1616a1kjvmx5.mysql.rds.aliyuncs.com", 3306, "vas", "root", "Wishpal2024" ) self.mysql_db_bi = Mysql( "172.31.37.71", 3306, "bi", "root", "Wishpal@2023" ) def __del__(self): self.mysql_db_vas.close() self.mysql_db_bi.close() def update_streamer_day_income(self): # 获取主播当天收入 st = get_today_zero_time() et = st + 86400 sql = "SELECT mid, SUM(`change`) AS income FROM `vas_ch_income` WHERE mid>0 AND ct>={} AND ct<{} GROUP BY mid".format(st, et) rows = self.mysql_db_vas.query(sql) print("len(rows): {}".format(len(rows))) sort_dict_list(rows, "income", desc=True) # 清理表 sql = "truncate table vas_cur_streamer_income" ret = self.mysql_db_bi.exec(sql, ()) print("truncate finish, ret: {}".format(ret)) dt = datetime.datetime.fromtimestamp(st) formatted_dt = dt.strftime('%Y-%m-%d 00:00:00') idx = 0 for row in rows: idx += 1 print("{}/{}, {}".format(idx, len(rows), row)) mid = safe_get_int(row, "mid") income = safe_get_int(row, "income") # sql_q = "select * from vas_cur_streamer_income where mid={} and pdate='{}'".format(mid, formatted_dt) # rows_bi = self.mysql_db_bi.query(sql_q) # if len(rows_bi) > 0: # sql_u = "update vas_cur_streamer_income set income=%s where mid=%s and pdate=%s" # self.mysql_db_bi.exec(sql_u, (income, mid, formatted_dt)) # else: sql_i = "insert into vas_cur_streamer_income (mid, pdate, income) values (%s,%s,%s)" self.mysql_db_bi.exec(sql_i, (mid, formatted_dt, income)) def proc(self): self.update_streamer_day_income() s = S() s.proc()