scripts/bigdata/raw_action.py

113 lines
3.2 KiB
Python

import json
from lib.util import *
import redis
from lib.odps import *
import time
from lib.log import *
service_name = "raw_action"
log_dir = '.'
logger = Logger(service_name, log_dir=log_dir, print_terminal=False)
class S:
def __init__(self):
self.odps_db = get_odps_db()
self.redis_cli = redis.Redis(
host="172.31.37.66",
port=6379,
password="redis_HtfNQJ",
)
self.bulk = list()
self.ut = int(time.time())
def __del__(self):
self.redis_cli.close()
def flush(self):
if len(self.bulk) <= 0:
return
tomorrow_zero_time = get_today_zero_time() + 86400
inserts = list()
ym = ""
day = ""
while len(self.bulk) > 0:
action = self.bulk.pop()
ct = safe_get_int(action, "t")
if ct >= tomorrow_zero_time:
continue
dt = datetime.datetime.fromtimestamp(ct)
new_ym = "%d%02d" % (dt.year, dt.month)
new_day = "%02d" % dt.day
if ym == "" and day == "":
ym = new_ym
day = new_day
if new_ym != ym or new_day != day:
break
data = [
str(ct),
safe_get_str(action, "app"),
str(safe_get_int(action, "mid")),
safe_get_str(action, "oid"),
safe_get_str(action, "type"),
safe_get_str(action, "stype"),
safe_get_str(action, "from"),
safe_get_str(action, "ip"),
safe_get_str(action, "opt"),
]
inserts.append(data)
partition = "ym=%s,day=%s" % (ym, day)
self.odps_db.write_table(
"actionlog", inserts,
partition=partition,
create_partition=True
)
self.ut = int(time.time())
logger.Info("insert: {}, left: {}".format(len(inserts), len(self.bulk)))
def loop(self):
key = "action"
while True:
time_now = int(time.time())
if len(self.bulk) >= 10000 or time_now - self.ut >= 60:
self.flush()
try:
# 从列表左侧弹出元素
item = self.redis_cli.lpop(key)
if not item:
logger.Info("no data, sleep...")
time.sleep(0.5)
continue
msg_str = item.decode("utf-8")
msg = json.loads(msg_str)
action_str = safe_get_str(msg, "message")
action = json.loads(action_str)
self.bulk.append(action)
if len(self.bulk) % 100 == 0:
logger.Info("len(bulk): {}".format(len(self.bulk)))
except Exception as e:
logger.Error("panic: {}".format(str(e)))
# 等待一段时间再检查
# time.sleep(1)
s = S()
s.loop()
#
# lis = [1, 2, 3]
# while len(lis) > 0:
# print(lis.pop())
# print(lis)
#
# dt = datetime.datetime.fromtimestamp(int(time.time()))
#
# new_ym = "%d%02d" % (dt.year, dt.month)
# new_day = "%02d" % dt.day
#
# print(new_ym, new_day)