from lib.all import * import subprocess import json host = "https://wishpal-ironfan-media.oss-cn-hangzhou-internal.aliyuncs.com/" def get_pixel_format(video_url): command = [ 'ffprobe', '-v', 'quiet', '-show_streams', '-select_streams', 'v:0', '-print_format', 'json', video_url ] result = subprocess.run(command, capture_output=True, text=True) output = result.stdout data = json.loads(output) streams = data.get('streams', []) if streams: pix_fmt = streams[0].get('pix_fmt', 'Unknown') return pix_fmt else: return 'No video stream found' class S: def __init__(self): self.col_video = MongoDB( host="mongodb://root:Wishpal2024@dds-bp1da1ddd62bede41.mongodb.rds.aliyuncs.com:3717,dds-bp1da1ddd62bede42.mongodb.rds.aliyuncs.com:3717/admin?replicaSet=mgset-77304659", port=3717, db="media", collection="video" ) def __del__(self): self.col_video.close() def proc(self): prj = { "src_id": 1, "src_id_720": 1 } docs = self.col_video.find({}, projection=prj) for d in docs[:100]: vid = safe_get_int(d, "_id") src_id_720 = safe_get_str(d, "src_id_720") url = host + src_id_720 if len(src_id_720) > 0: print(vid, url) # video_url = 'https://filecdn01.tiefen.fun/vdprodh264720/5a/d7/175d-2137-4a66-8e5c-d5ed61241b6c' # pixel_format = get_pixel_format(video_url) # print(f'Pixel Format: {pixel_format}') s = S() s.proc()