import json from lib.util import * import redis from lib.odps import * import time from lib.log import * service_name = "raw_action" log_dir = '.' logger = Logger(service_name, log_dir=log_dir, print_terminal=False) class S: def __init__(self): self.odps_db = get_odps_db() self.redis_cli = redis.Redis( host="172.31.37.66", port=6379, password="redis_HtfNQJ", ) self.bulk = list() self.ut = int(time.time()) def __del__(self): self.redis_cli.close() def flush(self): if len(self.bulk) <= 0: return tomorrow_zero_time = get_today_zero_time() + 86400 inserts = list() ym = "" day = "" while len(self.bulk) > 0: action = self.bulk.pop() ct = safe_get_int(action, "t") if ct >= tomorrow_zero_time: continue dt = datetime.datetime.fromtimestamp(ct) new_ym = "%d%02d" % (dt.year, dt.month) new_day = "%02d" % dt.day if ym == "" and day == "": ym = new_ym day = new_day if new_ym != ym or new_day != day: break data = [ str(ct), safe_get_str(action, "app"), str(safe_get_int(action, "mid")), safe_get_str(action, "oid"), safe_get_str(action, "type"), safe_get_str(action, "stype"), safe_get_str(action, "from"), safe_get_str(action, "ip"), safe_get_str(action, "opt"), ] inserts.append(data) partition = "ym=%s,day=%s" % (ym, day) self.odps_db.write_table( "actionlog", inserts, partition=partition, create_partition=True ) self.ut = int(time.time()) logger.Info("insert: {}, left: {}".format(len(inserts), len(self.bulk))) def loop(self): key = "action" while True: time_now = int(time.time()) if len(self.bulk) >= 10000 or time_now - self.ut >= 60: self.flush() try: # 从列表左侧弹出元素 item = self.redis_cli.lpop(key) if not item: logger.Info("no data, sleep...") time.sleep(0.5) continue msg_str = item.decode("utf-8") msg = json.loads(msg_str) action_str = safe_get_str(msg, "message") action = json.loads(action_str) self.bulk.append(action) if len(self.bulk) % 100 == 0: logger.Info("len(bulk): {}".format(len(self.bulk))) except Exception as e: logger.Error("panic: {}".format(str(e))) # 等待一段时间再检查 # time.sleep(1) s = S() s.loop() # # lis = [1, 2, 3] # while len(lis) > 0: # print(lis.pop()) # print(lis) # # dt = datetime.datetime.fromtimestamp(int(time.time())) # # new_ym = "%d%02d" % (dt.year, dt.month) # new_day = "%02d" % dt.day # # print(new_ym, new_day)