scripts/lib/util.py

259 lines
6.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
import pytz
import requests
import json
import datetime
from bson import ObjectId
import smtplib
from email.mime.multipart import MIMEMultipart
from email.header import Header
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
# 时间字符串转时间戳
def get_ts_by_str(ts_str='2021-01-01 00:00:00', fmt='%Y-%m-%d %H:%M:%S'):
return int(time.mktime(time.strptime(ts_str, fmt)))
# 时间戳转时间字符串
def get_time_str_by_ts(ts, fmt="%Y-%m-%d %H:%M:%S"):
return time.strftime(fmt, time.localtime(ts))
def get_time_str_by_ts_v2(ts, fmt="%Y-%m-%d %H:%M:%S"):
local_tz = pytz.timezone('Asia/Shanghai')
utc_dt = datetime.datetime.fromtimestamp(ts, pytz.utc) # 将时间戳转换为UTC时间
local_dt = utc_dt.astimezone(local_tz)
return local_dt.strftime(fmt)
# 把数组每个元素转成指定类型
def arr_type_change(arr, typ='str'):
ret_arr = []
for v in arr:
if typ == 'str':
ret_arr.append(str(v))
elif typ == 'int':
ret_arr.append(int(v))
elif typ == 'float':
ret_arr.append(float(v))
return ret_arr
# 数组去掉括号,返回以","为间隔的字符串针对int类型
def get_list_str(lis):
return str(",".join(str(i) for i in lis))
# 秒数转时间
intervals = (
('Y', 31536000), # 60 * 60 * 24 * 365
('D', 86400), # 60 * 60 * 24
('H', 3600), # 60 * 60
('M', 60),
('S', 1),
)
def display_time(seconds, max_unit='D', granularity=2):
unit_idx = 0
if max_unit == 'Y': # 年
unit_idx = 0
elif max_unit == 'D': # 日
unit_idx = 1
elif max_unit == 'H': # 小时
unit_idx = 2
elif max_unit == 'M': # 分钟
unit_idx = 3
elif max_unit == 'S': # 秒
unit_idx = 4
result = []
for name, count in tuple(list(intervals)[unit_idx:]):
value = seconds // count
if value:
seconds -= value * count
if value == 1:
name = name.rstrip('S')
result.append("{}{}".format(value, name))
return ', '.join(result[:granularity])
# 请求服务
def call_service(service_url, req_param, timeout=10, headers=None, params=None):
res = requests.post(service_url, json=req_param, timeout=timeout, headers=headers, params=params)
st_code = res.status_code
if st_code != 200:
return None
r = json.loads(res.text)
return r
def call_service_raw_data(service_url, req_param, timeout=10, headers=None):
res = requests.post(service_url, json=req_param, timeout=timeout, headers=headers)
st_code = res.status_code
if st_code != 200:
return None
return res.text
# 安全获取字段
def safe_get(item, field, typ, default):
ret = default
if item is None:
return ret
if (not isinstance(item, dict)) and (not isinstance(typ, type)):
return ret
if field in item.keys():
if item[field]:
ret = typ(item[field])
return ret
def safe_get_str(item, field):
return safe_get(item, field, str, str())
def safe_get_int(item, field):
return safe_get(item, field, int, int())
def safe_get_list(item, field):
return safe_get(item, field, list, list())
def safe_get_dict(item, field):
return safe_get(item, field, dict, dict())
def safe_get_set(item, field):
return safe_get(item, field, set, set())
def safe_get_float(item, field):
return safe_get(item, field, float, float())
# 数组去掉括号,返回以","为间隔的字符串,针对字符串类型
def get_list_str_str(lis):
return str(",".join(("\'" + str(i) + "\'") for i in lis))
# 获取今天0点时间戳
def get_today_zero_time():
return int(time.mktime(datetime.date.today().timetuple()))
# 获取时间戳对应的0点时间戳
def get_time_zero_time(t):
ymd = get_time_str_by_ts(t, "%Y-%m-%d")
ymd_hms = ymd + " 00:00:00"
return get_ts_by_str(ymd_hms)
# 根据时间获取mongo的_id
def get_mongo_id_by_ts(ts_str):
hex_ts = hex(get_ts_by_str(ts_str))
return ObjectId(hex_ts[2:] + '0000000000000000')
# 安全除法
def safe_div(a, b):
if b == 0:
return 0
else:
return float(a) / float(b)
# 发送邮件用ssl
def send_email(subject, file_info_list, receivers, mail_text):
"""
发送邮件方法
:param subject: 邮件名称
:param file_info_list: 附件列表,格式: [{'path': '/tmp/', 'name': 'aaa.txt'}, .....]
:param receivers: 收件人列表, 格式: ['aaa@ixiaochuan.com', 'bbb@ixiaochuan.com']
:param mail_text: 正文部分
:return:
"""
username = 'xxxx@xxx.cn'
password = 'xxxxxxxx'
sender = username
msg = MIMEMultipart()
msg['Subject'] = Header(subject, 'utf-8')
msg['From'] = sender
msg['To'] = ';'.join(receivers)
# 文字部分
pure_text = MIMEText(mail_text, _subtype='plain', _charset='utf-8')
msg.attach(pure_text)
# 增加附件
if file_info_list:
for file_info in file_info_list:
attachment = MIMEApplication(open(file_info['path'] + file_info['name'], 'rb').read())
attachment.add_header('Content-Disposition', 'attachment', filename=file_info['name'])
msg.attach(attachment)
try:
client = smtplib.SMTP_SSL('smtp.exmail.qq.com:465')
client.login(username, password)
client.sendmail(sender, receivers, msg.as_string())
client.quit()
return True, '邮件发送成功!'
except smtplib.SMTPRecipientsRefused:
return False, 'Recipient refused'
except smtplib.SMTPAuthenticationError as e:
return False, str(e)
except smtplib.SMTPSenderRefused:
return False, 'Sender refused'
except smtplib.SMTPException as e:
return False, str(e)
# 字典list排序默认升序
def sort_dict_list(lis, key, desc=False):
if not isinstance(lis, list):
return
reverse = False
if desc:
reverse = True
lis.sort(key=lambda k: (k.get(key, 0)), reverse=reverse)
# 生成开始时间、结束时间map
def gen_st_et_str_map(st_str, day):
st_et_str_map = dict()
st = get_ts_by_str(st_str)
for i in range(0, day):
st_et_str_map[get_time_str_by_ts(st + 86400 * i)] = get_time_str_by_ts(st + 86400 * (i + 1))
return st_et_str_map
# 生成开始时间、结束时间map
def gen_st_et_str_map_v2(st_str, et_str):
st_et_str_map = dict()
st = get_ts_by_str(st_str)
for i in range(0, int((get_ts_by_str(et_str) - get_ts_by_str(st_str)) / 86400) + 1):
st_et_str_map[get_time_str_by_ts(st + 86400 * i)] = get_time_str_by_ts(st + 86400 * (i + 1))
return st_et_str_map
def dict2json(dic):
if not isinstance(dic, dict):
return json.dumps({})
return json.dumps(dic, ensure_ascii=False)