qushuiyin/qushuiyin.py

414 lines
16 KiB
Python

import re
import json
import requests
import urllib.parse
import time
proxy_url = "https://qsyapi.wishpal.cn/proxy?url="
def endecode_and_proxy_url(original_url):
encoded_url = urllib.parse.quote(original_url, safe="")
final_url = proxy_url + encoded_url
return final_url
class Video:
def douyin(self, url):
# Extract ID from URL
for attempt_1 in range(10):
id_match = self.extract_id(url)
if id_match:
break
else:
time.sleep(0.5)
else:
return {"code": 400, "msg": "Unable to parse video ID"}
video_id = id_match
# Construct request headers
headers = {
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1 Edg/122.0.0.0"
}
# Send request to get video info
for attempt in range(10):
response = requests.get(
f"https://www.iesdouyin.com/share/video/{video_id}", headers=headers
)
pattern = r"window\._ROUTER_DATA\s*=\s*(.*?)</script>"
matches = re.search(pattern, response.text, re.DOTALL)
if matches:
break
else:
time.sleep(0.5)
else:
return {"code": 201, "msg": "Parsing failed"}
video_info = json.loads(matches.group(1).strip())
if "loaderData" not in video_info:
return {"code": 201, "msg": "Parsing failed"}
if (
video_info["loaderData"]["video_(id)/page"]["videoInfoRes"]["item_list"][0][
"images"
]
!= None
):
origin_image_urls = [
image["url_list"][0]
for image in video_info["loaderData"]["video_(id)/page"][
"videoInfoRes"
]["item_list"][0]["images"]
]
image_urls = [
endecode_and_proxy_url(image["url_list"][0])
for image in video_info["loaderData"]["video_(id)/page"][
"videoInfoRes"
]["item_list"][0]["images"]
]
return {
"code": 200,
"msg": "Parsing successful",
"data": {
"author": video_info["loaderData"]["video_(id)/page"][
"videoInfoRes"
]["item_list"][0]["author"]["nickname"],
"uid": video_info["loaderData"]["video_(id)/page"]["videoInfoRes"][
"item_list"
][0]["author"]["unique_id"],
"avatar": endecode_and_proxy_url(
video_info["loaderData"]["video_(id)/page"]["videoInfoRes"][
"item_list"
][0]["author"]["avatar_medium"]["url_list"][0]
),
"like": video_info["loaderData"]["video_(id)/page"]["videoInfoRes"][
"item_list"
][0]["statistics"]["digg_count"],
"time": video_info["loaderData"]["video_(id)/page"]["videoInfoRes"][
"item_list"
][0]["create_time"],
"title": video_info["loaderData"]["video_(id)/page"][
"videoInfoRes"
]["item_list"][0]["desc"],
"cover": endecode_and_proxy_url(
video_info["loaderData"]["video_(id)/page"]["videoInfoRes"][
"item_list"
][0]["video"]["cover"]["url_list"][0]
),
"origin_cover": video_info["loaderData"]["video_(id)/page"][
"videoInfoRes"
]["item_list"][0]["video"]["cover"]["url_list"][0],
"image_urls": image_urls,
"origin_image_urls": origin_image_urls,
"music": {
"author": video_info["loaderData"]["video_(id)/page"][
"videoInfoRes"
]["item_list"][0]["music"]["author"],
"avatar": endecode_and_proxy_url(
video_info["loaderData"]["video_(id)/page"]["videoInfoRes"][
"item_list"
][0]["music"]["cover_large"]["url_list"][0]
),
},
},
}
video_res_url = video_info["loaderData"]["video_(id)/page"]["videoInfoRes"][
"item_list"
][0]["video"]["play_addr"]["url_list"][0]
video_res_url = video_res_url.replace("playwm", "play")
# 处理重定向
response2 = requests.get(video_res_url, allow_redirects=True)
final_video_res_url = response2.url
return {
"code": 200,
"msg": "Parsing successful",
"data": {
"author": video_info["loaderData"]["video_(id)/page"]["videoInfoRes"][
"item_list"
][0]["author"]["nickname"],
"uid": video_info["loaderData"]["video_(id)/page"]["videoInfoRes"][
"item_list"
][0]["author"]["unique_id"],
"avatar": endecode_and_proxy_url(
video_info["loaderData"]["video_(id)/page"]["videoInfoRes"][
"item_list"
][0]["author"]["avatar_medium"]["url_list"][0]
),
"like": video_info["loaderData"]["video_(id)/page"]["videoInfoRes"][
"item_list"
][0]["statistics"]["digg_count"],
"time": video_info["loaderData"]["video_(id)/page"]["videoInfoRes"][
"item_list"
][0]["create_time"],
"title": video_info["loaderData"]["video_(id)/page"]["videoInfoRes"][
"item_list"
][0]["desc"],
"cover": endecode_and_proxy_url(
video_info["loaderData"]["video_(id)/page"]["videoInfoRes"][
"item_list"
][0]["video"]["cover"]["url_list"][0]
),
"origin_cover": video_info["loaderData"]["video_(id)/page"][
"videoInfoRes"
]["item_list"][0]["video"]["cover"]["url_list"][0],
"url": endecode_and_proxy_url(final_video_res_url),
"origin_url": final_video_res_url,
"music": {
"author": video_info["loaderData"]["video_(id)/page"][
"videoInfoRes"
]["item_list"][0]["music"]["author"],
"avatar": endecode_and_proxy_url(
video_info["loaderData"]["video_(id)/page"]["videoInfoRes"][
"item_list"
][0]["music"]["cover_large"]["url_list"][0]
),
},
},
}
def extract_id(self, url):
try:
response = requests.head(url, allow_redirects=True)
final_url = response.url
id_match = re.search(r"/(\d+)", final_url)
return id_match.group(1) if id_match else None
except Exception as e:
return None
def pipigaoxiao(self, url):
match = re.search(r"post/(\d+)", url)
if match:
id = match.group(1)
response = self.pipigaoxiao_curl(id)
arr = json.loads(response)
img_id = arr["data"]["post"]["imgs"][0]["id"]
if img_id:
result = {
"code": 200,
"msg": "解析成功",
"data": {
"title": arr["data"]["post"]["content"],
"cover": endecode_and_proxy_url(
f"https://file.ippzone.com/img/view/id/{img_id}"
),
"origin_cover": f"https://file.ippzone.com/img/view/id/{img_id}",
"url": endecode_and_proxy_url(
arr["data"]["post"]["videos"][str(img_id)]["url"]
),
"origin_url": arr["data"]["post"]["videos"][str(img_id)]["url"],
},
}
return result
def pipigaoxiao_curl(self, id):
post_data = json.dumps({"pid": int(id), "type": "post", "mid": None})
headers = {
"Referer": "http://share.ippzone.com/ppapi/share/fetch_content",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36",
"Content-Encoding": "gzip,deflate",
}
response = requests.post(
"http://share.ippzone.com/ppapi/share/fetch_content",
data=post_data,
headers=headers,
verify=False,
timeout=5,
)
return response.text
def kuaishou(self, url):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
}
if "v.kuaishou.com" in url:
response = requests.head(url, headers=headers, allow_redirects=True)
url = response.url
match = re.search(r"photoId=(.*?)&", url)
else:
match = re.search(r"short-video/(.*?)\?", url)
if match:
photo_id = match.group(1)
headers = {
"Cookie": "did=web_0694588f58404427ac09c75f1845df47; didv=1721897287000;",
"Referer": url,
"Content-Type": "application/json",
}
post_data = json.dumps(
{
"photoId": photo_id.replace("video/", "").replace("?", ""),
"isLongVideo": False,
}
)
url = "https://v.m.chenzhongtech.com/rest/wd/photo/info"
response = requests.post(url, headers=headers, data=post_data)
json_data = response.json()
if len(json_data["photo"]["mainMvUrls"]) == 0:
cdn = json_data["photo"]["ext_params"]["atlas"]["cdn"][0]
origin_image_urls = [
("https://" + cdn + image)
for image in json_data["photo"]["ext_params"]["atlas"]["list"]
]
image_urls = [endecode_and_proxy_url(url) for url in origin_image_urls]
result = {
"code": 200,
"msg": "解析成功",
"data": {
"avatar": endecode_and_proxy_url(json_data["photo"]["headUrl"]),
"author": json_data["photo"]["userName"],
"time": json_data["photo"]["timestamp"],
"title": json_data["photo"]["caption"],
"cover": endecode_and_proxy_url(
json_data["photo"]["coverUrls"][0]["url"]
),
"origin_cover": json_data["photo"]["coverUrls"][0]["url"],
"image_urls": image_urls,
"origin_image_urls": origin_image_urls,
},
}
return result
video_url = json_data["photo"]["mainMvUrls"][0]["url"]
if video_url:
result = {
"code": 200,
"msg": "解析成功",
"data": {
"avatar": endecode_and_proxy_url(json_data["photo"]["headUrl"]),
"author": json_data["photo"]["userName"],
"time": json_data["photo"]["timestamp"],
"title": json_data["photo"]["caption"],
"cover": endecode_and_proxy_url(
json_data["photo"]["coverUrls"][0]["url"]
),
"origin_cover": json_data["photo"]["coverUrls"][0]["url"],
"url": endecode_and_proxy_url(video_url),
"origin_url": video_url,
},
}
return result
return None
def zuiyou(self, url):
response = requests.get(url)
text = response.text
video = re.search(r'fullscreen="false" src="(.*?)"', text)
video_title = re.search(r":</span><h1>(.*?)</h1></div><div class=", text)
video_cover = re.search(r'poster="(.*?)">', text)
video_author = re.search(
r'<span class="SharePostCard__name">(.*?)</span>', text
)
video_url = (
video.group(1).replace("\\", "/").replace("u002F", "") if video else None
)
if video_url:
result = {
"code": 200,
"msg": "解析成功",
"data": {
"author": video_author.group(1) if video_author else "",
"title": video_title.group(1) if video_title else "",
"cover": endecode_and_proxy_url(video_cover.group(1)),
"origin_cover": video_cover.group(1),
"url": endecode_and_proxy_url(video_url),
"origin_url": video_url,
},
}
return result
return None
def pipixia(self, url):
# 获取重定向后的 URL
response = requests.head(url, allow_redirects=True)
loc = response.url
if not loc:
return {"code": 400, "msg": "无法获取重定向后的 URL"}
# 提取 item ID
id_match = re.search(r"item/(.*)\?", loc)
if not id_match:
return {"code": 400, "msg": "无法提取 item ID"}
item_id = id_match.group(1)
# 获取 JSON 数据
api_url = f"https://is.snssdk.com/bds/cell/detail/?cell_type=1&aid=1319&app_name=super&cell_id={item_id}"
response = requests.get(api_url)
data = response.json()
try:
video_url = data["data"]["data"]["item"]["origin_video_download"][
"url_list"
][0]["url"]
if video_url:
result = {
"code": 200,
"data": {
"author": data["data"]["data"]["item"]["author"]["name"],
"avatar": endecode_and_proxy_url(
data["data"]["data"]["item"]["author"]["avatar"][
"download_list"
][0]["url"]
),
"time": data["data"]["data"]["display_time"],
"title": data["data"]["data"]["item"]["content"],
"cover": endecode_and_proxy_url(
data["data"]["data"]["item"]["cover"]["url_list"][0]["url"]
),
"origin_cover": data["data"]["data"]["item"]["cover"][
"url_list"
][0]["url"],
"url": endecode_and_proxy_url(video_url),
"origin_url": video_url,
},
}
return result
except KeyError:
return {"code": 500, "msg": "解析 JSON 数据时出错"}
def get_raw_url(share_url):
regex = r"http[s]?://[\w.]+[\w\/]*[\w.]*\??[\w=&:\-\+\%]*[/]*"
match = re.search(regex, share_url)
if match:
url = match.group(0)
return url
else:
return None
def clean_mask(share_url):
raw_url = get_raw_url(share_url)
video = Video()
if "douyin" in raw_url:
result = video.douyin(raw_url)
elif "pipigx" in raw_url:
result = video.pipigaoxiao(raw_url)
elif "kuaishou" in raw_url:
result = video.kuaishou(raw_url)
elif "pipix" in raw_url:
result = video.pipixia(raw_url)
elif "xiaochuankeji" in raw_url:
result = video.zuiyou(raw_url)
else:
result = None
print(result)
return result
if __name__ == "__main__":
share_url = input("请输入分享链接:")
clean_mask(share_url)