2024-12-24 18:08:07 +08:00
|
|
|
import json
|
|
|
|
from lib.util import *
|
|
|
|
import redis
|
|
|
|
from lib.odps import *
|
|
|
|
import time
|
|
|
|
from lib.log import *
|
|
|
|
|
|
|
|
service_name = "raw_action"
|
|
|
|
log_dir = '.'
|
2024-12-24 18:27:16 +08:00
|
|
|
logger = Logger(service_name, log_dir=log_dir, print_terminal=False)
|
2024-12-24 18:08:07 +08:00
|
|
|
|
|
|
|
|
|
|
|
class S:
|
|
|
|
def __init__(self):
|
|
|
|
self.odps_db = get_odps_db()
|
|
|
|
self.redis_cli = redis.Redis(
|
|
|
|
host="172.31.37.66",
|
|
|
|
port=6379,
|
|
|
|
password="redis_HtfNQJ",
|
|
|
|
)
|
|
|
|
self.bulk = list()
|
|
|
|
self.ut = int(time.time())
|
|
|
|
|
|
|
|
def __del__(self):
|
|
|
|
self.redis_cli.close()
|
|
|
|
|
|
|
|
def flush(self):
|
|
|
|
if len(self.bulk) <= 0:
|
|
|
|
return
|
|
|
|
|
|
|
|
tomorrow_zero_time = get_today_zero_time() + 86400
|
|
|
|
inserts = list()
|
|
|
|
ym = ""
|
|
|
|
day = ""
|
|
|
|
while len(self.bulk) > 0:
|
|
|
|
action = self.bulk.pop()
|
|
|
|
ct = safe_get_int(action, "t")
|
|
|
|
if ct >= tomorrow_zero_time:
|
|
|
|
continue
|
|
|
|
dt = datetime.datetime.fromtimestamp(ct)
|
|
|
|
|
|
|
|
new_ym = "%d%02d" % (dt.year, dt.month)
|
|
|
|
new_day = "%02d" % dt.day
|
|
|
|
if ym == "" and day == "":
|
|
|
|
ym = new_ym
|
|
|
|
day = new_day
|
|
|
|
if new_ym != ym or new_day != day:
|
|
|
|
break
|
|
|
|
|
|
|
|
data = [
|
|
|
|
str(ct),
|
|
|
|
safe_get_str(action, "app"),
|
|
|
|
str(safe_get_int(action, "mid")),
|
|
|
|
safe_get_str(action, "oid"),
|
|
|
|
safe_get_str(action, "type"),
|
|
|
|
safe_get_str(action, "stype"),
|
|
|
|
safe_get_str(action, "from"),
|
|
|
|
safe_get_str(action, "ip"),
|
|
|
|
safe_get_str(action, "opt"),
|
|
|
|
]
|
|
|
|
inserts.append(data)
|
|
|
|
partition = "ym=%s,day=%s" % (ym, day)
|
|
|
|
self.odps_db.write_table(
|
|
|
|
"actionlog", inserts,
|
|
|
|
partition=partition,
|
|
|
|
create_partition=True
|
|
|
|
)
|
|
|
|
self.ut = int(time.time())
|
|
|
|
logger.Info("insert: {}, left: {}".format(len(inserts), len(self.bulk)))
|
|
|
|
|
|
|
|
def loop(self):
|
|
|
|
key = "action"
|
|
|
|
while True:
|
|
|
|
time_now = int(time.time())
|
|
|
|
if len(self.bulk) >= 10000 or time_now - self.ut >= 60:
|
|
|
|
self.flush()
|
|
|
|
|
|
|
|
try:
|
|
|
|
# 从列表左侧弹出元素
|
|
|
|
item = self.redis_cli.lpop(key)
|
|
|
|
if not item:
|
|
|
|
logger.Info("no data, sleep...")
|
|
|
|
time.sleep(0.5)
|
|
|
|
continue
|
|
|
|
msg_str = item.decode("utf-8")
|
|
|
|
msg = json.loads(msg_str)
|
|
|
|
action_str = safe_get_str(msg, "message")
|
|
|
|
action = json.loads(action_str)
|
|
|
|
self.bulk.append(action)
|
|
|
|
if len(self.bulk) % 100 == 0:
|
|
|
|
logger.Info("len(bulk): {}".format(len(self.bulk)))
|
|
|
|
except Exception as e:
|
|
|
|
logger.Error("panic: {}".format(str(e)))
|
|
|
|
|
|
|
|
# 等待一段时间再检查
|
|
|
|
# time.sleep(1)
|
|
|
|
|
|
|
|
|
|
|
|
s = S()
|
|
|
|
s.loop()
|
|
|
|
#
|
|
|
|
# lis = [1, 2, 3]
|
|
|
|
# while len(lis) > 0:
|
|
|
|
# print(lis.pop())
|
|
|
|
# print(lis)
|
|
|
|
#
|
|
|
|
# dt = datetime.datetime.fromtimestamp(int(time.time()))
|
|
|
|
#
|
|
|
|
# new_ym = "%d%02d" % (dt.year, dt.month)
|
|
|
|
# new_day = "%02d" % dt.day
|
|
|
|
#
|
|
|
|
# print(new_ym, new_day)
|