scripts/cronjob/media/vd_compress.py

370 lines
13 KiB
Python

from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.request import CommonRequest
from lib.all import *
import ffmpy
import cv2
import oss2
import subprocess
service_name = 'vd_compress'
# log_dir = '/app/log'
log_dir = '/Users/erwin/data/log'
logger = Logger(service_name, log_dir=log_dir, print_terminal=False)
access_key_id = "LTAI5tAdu5LRvZwm4LJa21Fo"
access_key_secret = "WGvSQsDralTfFAAxhEqLBOgbXqflHo"
endpoint_internal = "https://oss-cn-hangzhou.aliyuncs.com"
bucket_name = "wishpal-ironfan-media"
bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint_internal, bucket_name)
url_get_videos_by_status = "https://api.tiefen.fun/op/media/get_videos_by_status"
url_update_video_compress = "https://api.tiefen.fun/op/media/update_video_compress"
url_update_video_pf = "https://api.tiefen.fun/op/media/update_video_pf"
cdn_host = "https://filecdn01.tiefen.fun/"
acs_client = AcsClient(access_key_id, access_key_secret, 'cn-hangzhou')
def refresh_cdn_cache(url):
request = CommonRequest()
request.set_domain('cdn.aliyuncs.com')
request.set_version('2018-05-10')
request.set_action_name('RefreshObjectCaches')
request.set_method('POST')
request.add_query_param('ObjectPath', url)
request.add_query_param('ObjectType', 'File') # 可以是 'File' 或 'Directory'
response = acs_client.do_action_with_exception(request)
return response
def update_pixel_format(vid, pf, pf720):
param = {
"id": vid,
"pixel_format": pf,
"pixel_format_720": pf720
}
res = call_service(url_update_video_pf, param)
ret = safe_get_int(res, "ret")
return ret
def get_pixel_format(vd_path):
command = [
'ffprobe', '-v', 'quiet',
'-show_streams', '-select_streams', 'v:0',
'-print_format', 'json', vd_path
]
result = subprocess.run(command, capture_output=True, text=True)
output = result.stdout
data = json.loads(output)
streams = data.get('streams', [])
if streams:
pix_fmt = streams[0].get('pix_fmt', 'Unknown')
return pix_fmt
else:
return 'No video stream found'
class VdHelper:
def __init__(self, vd_path):
self._video = cv2.VideoCapture(vd_path)
self._vd_path = vd_path
self._width = int(self._video.get(cv2.CAP_PROP_FRAME_WIDTH))
self._height = int(self._video.get(cv2.CAP_PROP_FRAME_HEIGHT))
self._fps = int(self._video.get(cv2.CAP_PROP_FPS))
print(f"w: {self._width}, h: {self._height}, fps: {self._fps}")
def get_vd_path(self):
return self._vd_path
def set_vd_path(self, vd_path):
self._vd_path = vd_path
def get_fps(self):
return self._fps
def get_w_h(self):
return self._width, self._height
def remove(self):
return os.remove(self._vd_path)
def resize(self, p=1080):
if self._width > self._height:
rate = float(p) / float(self._height)
new_width = int(float(self._width) * rate)
new_height = p
else:
rate = float(p) / float(self._width)
new_width = p
new_height = int(float(self._height) * rate)
if new_width % 2 != 0:
new_width -= 1
if new_height % 2 != 0:
new_height -= 1
return new_width, new_height
def get_pixel_format(self):
command = [
'ffprobe', '-v', 'quiet',
'-show_streams', '-select_streams', 'v:0',
'-print_format', 'json', self._vd_path
]
result = subprocess.run(command, capture_output=True, text=True)
output = result.stdout
data = json.loads(output)
streams = data.get('streams', [])
if streams:
pix_fmt = streams[0].get('pix_fmt', 'Unknown')
return pix_fmt
else:
return 'No video stream found'
def transfer_h264_720p_25fps_10bit_to_8bit(self, output_path_8bit, output_path):
# ffmpeg -i input.mov -pix_fmt gbrp10le -vf format=gbrp -pix_fmt yuv420p input_8bit.mp4
output_args = "-pix_fmt gbrp10le -vf format=gbrp -pix_fmt yuv420p -y"
ff = ffmpy.FFmpeg(
inputs={self.get_vd_path(): None},
outputs={output_path_8bit: output_args}
)
ff.run()
logger.Info("finish 10bit to 8bit, path: {}".format(output_path_8bit))
self.set_vd_path(output_path_8bit)
self.transfer_h264_720p_25fps(output_path)
def transfer_h264_720p_25fps(self, output_path):
new_width, new_height = self.resize(720)
new_size_str = "{}x{}".format(new_width, new_height)
output_args = '-c:v libx264 -s {} -crf {} -y'.format(new_size_str, 23)
if self.get_fps() > 25:
output_args = '-c:v libx264 -s {} -crf {} -r 25 -y'.format(new_size_str, 23)
ff = ffmpy.FFmpeg(
inputs={self.get_vd_path(): None},
outputs={output_path: output_args}
)
ff.run()
class S:
def __init__(self):
# self.bucket = oss2.Bucket(oss2.Auth(access_key_id, access_key_secret), endpoint_internal, bucket_name)
# self.url_get_videos_by_status = "https://api.tiefen.fun/op/media/get_videos_by_status"
# self.url_update_video_compress = "https://api.tiefen.fun/op/media/update_video_compress"
# self.hw_cdn_host = "https://filecdn01.tiefen.fun/"
self.to_del_files = list()
def __del__(self):
for fp in self.to_del_files:
if os.path.exists(fp):
os.remove(fp)
def save_video_from_oss(self, oss_src_id: str, local_path: str):
return bucket.get_object_to_file(oss_src_id, local_path)
def upload_video_to_oss(self, local_path: str, oss_src_id: str):
return bucket.put_object_from_file(oss_src_id, local_path)
def get_one_wait_compress_video(self):
param = {
"ids": [], # 52062
"status": 0,
"offset": 0,
"limit": 1,
}
res = call_service(url_get_videos_by_status, param)
data = safe_get_dict(res, "data")
lis = safe_get_list(data, "video")
if len(lis) > 0:
return lis[0]
return None
def get_one_fail_compress_video(self):
param = {
"ids": [], # 52062
"status": -100,
"offset": 0,
"limit": 1,
}
res = call_service(url_get_videos_by_status, param)
data = safe_get_dict(res, "data")
lis = safe_get_list(data, "video")
if len(lis) > 0:
return lis[0]
return None
def set_compress_finish(self, video_id, size_src, src_id_h264, size_h264, src_id_720, size_720, status, resize_t):
param = {
"id": video_id,
"size_src": size_src,
"src_id_h264": src_id_h264,
"size_h264": size_h264,
"src_id_720": src_id_720,
"size_720": size_720,
"status": status,
"resize_t": resize_t,
}
logger.Info("{}".format(dict2json(param)))
res = call_service(url_update_video_compress, param)
ret = safe_get_int(res, "ret")
return ret
def proc_one(self, video):
video_id = safe_get_int(video, "id")
src_id = safe_get_str(video, "src_id")
src_id_h264_720 = src_id.replace("vdprod", "vdprodh264720")
src_id_python_type = src_id.replace("/", "_")
src_id_h264_720_python_type = src_id_h264_720.replace("/", "_")
cur_dir = os.getcwd() + "/"
local_src_path = cur_dir + src_id_python_type
local_h264_720_path = cur_dir + src_id_h264_720_python_type
# 下载视频到本地
obj = self.save_video_from_oss(src_id, local_src_path)
content_type = safe_get_str(obj.headers, "Content-Type")
file_size = int(safe_get_str(obj.headers, "Content-Length"))
logger.Info("video object, content_type: {}, fsize: {}".format(content_type, file_size))
if content_type == "video/mp4":
local_src_path_new = local_src_path + ".mp4"
elif content_type == "video/quicktime":
local_src_path_new = local_src_path + ".mov"
elif content_type == "video/x-m4v":
local_src_path_new = local_src_path + ".m4v"
else:
logger.Error("invalid content_type, id: {}, src_id: {}, content_type: {}".format(video_id, cdn_host + src_id, content_type))
return False
os.renames(local_src_path, local_src_path_new)
# vd helper
vdh = VdHelper(local_src_path_new)
self.to_del_files.append(local_src_path_new)
# 色深
pf = get_pixel_format(vdh.get_vd_path())
logger.Info("pixel_format: {}".format(pf))
output_file = local_h264_720_path + ".mp4"
self.to_del_files.append(output_file)
if pf not in ["yuv420p", "yuvj420p"]:
# 先转成8bit, 再转成264 720p mp4
output_file_8bit = local_h264_720_path + "_8bit_tmp.mp4"
vdh.transfer_h264_720p_25fps_10bit_to_8bit(output_file_8bit, output_file)
self.to_del_files.append(output_file_8bit)
else:
# 转成264 720p mp4
vdh.transfer_h264_720p_25fps(output_file)
# 上传
upload_ret = self.upload_video_to_oss(output_file, src_id_h264_720)
upload_ret_status = upload_ret.status
logger.Info("upload_ret, {}, {}".format(src_id, upload_ret_status))
output_content_type = "video/mp4"
output_file_size = os.path.getsize(output_file)
# 更新db
db_ret = self.set_compress_finish(video_id, file_size, "", 0, src_id_h264_720, output_file_size, 100, int(time.time()))
# 更新pf
pf_output = get_pixel_format(output_file)
logger.Info("pixel_format_output: {}".format(pf_output))
pf_ret = update_pixel_format(video_id, pf, pf_output)
# 刷新cdn
file_url = cdn_host + src_id_h264_720
refresh_res = refresh_cdn_cache(file_url)
logger.Info("refresh ali cdn, url: {}, res: {}".format(file_url, refresh_res))
logger.Info("before, {}, {}, {}".format(src_id, content_type, file_size))
logger.Info("after_h264720, {}, {}, {}, {}, {}".format(src_id_h264_720, output_content_type, output_file_size, db_ret, pf_ret))
logger.Info("host: {}".format(file_url))
return True
def proc_test_h264(input_file, output_file):
# 转成264 mp4
ff = ffmpy.FFmpeg(
inputs={input_file: None},
outputs={output_file: '-c:v libx264'}
)
ff.run()
def proc_test_h264_720(input_file, output_file, crf):
vdh = VdHelper(input_file)
# 转成264 mp4
new_width, new_height = vdh.resize(720)
new_size_str = "{}x{}".format(new_width, new_height)
output_args = '-c:v libx264 -s {} -crf {}'.format(new_size_str, crf)
if vdh.get_fps() > 25:
output_args = '-c:v libx264 -s {} -crf {} -r 25'.format(new_size_str, crf)
ff = ffmpy.FFmpeg(
inputs={input_file: None},
outputs={output_file: output_args}
)
ff.run()
def get_video_stat(video_path):
video = cv2.VideoCapture(video_path)
# 获取视频帧数
total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
# 获取视频帧率
fps = video.get(cv2.CAP_PROP_FPS)
# 获取视频宽度和高度
width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
print(video_path)
print(f"帧数: {total_frames}")
print(f"帧率: {fps}")
print(f"宽度: {width}")
print(f"高度: {height}")
idx = 0
while True:
from_fail = False
s = S()
video_id = 0
try:
video = s.get_one_wait_compress_video()
if not video:
video = s.get_one_fail_compress_video()
from_fail = True
if not video:
logger.Info("no video, sleep")
time.sleep(5)
continue
idx += 1
video_id = safe_get_int(video, "id")
logger.Info("================== {}, from_fail:{}, {} start ==================".format(idx, from_fail, video_id))
ok = s.proc_one(video)
if not ok:
if from_fail:
s.set_compress_finish(video_id, 0, "", 0, "", 0, -200, int(time.time()))
else:
s.set_compress_finish(video_id, 0, "", 0, "", 0, -100, int(time.time()))
logger.Info("================== {}, from_fail:{}, {} end ==================".format(idx, from_fail, video_id))
except Exception as e:
logger.Error("{}, {} _Panic: {}".format(idx, video_id, str(e)))
if from_fail:
s.set_compress_finish(video_id, 0, "", 0, "", 0, -200, int(time.time()))
else:
s.set_compress_finish(video_id, 0, "", 0, "", 0, -100, int(time.time()))
# break