123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138 |
- import logging
- import boto3
- import requests
- import yt_dlp
- from flask import Flask, request, Response
- logging.basicConfig(format='%(asctime)s [%(levelname)s] %(filename)s:%(lineno)d %(message)s',
- datefmt='%Y-%m-%d %H:%M:%S', level=logging.INFO)
- app = Flask(__name__)
- def get_key(url: str) -> str:
- return f"v2:{url}"
- def fetch_info(url):
- logging.info(f"fetching: {url}")
- with yt_dlp.YoutubeDL({
- "flat-playlist": True,
- "extract_flat": "flat-playlist"
- }) as ydl:
- info = ydl.extract_info(url, download=False)
- return info
- @app.route("/p", methods=["GET", "POST"])
- def p():
- video_id = request.args.get("id")
- format_id = request.args.get("fId")
- if video_id and format_id:
- info = fetch_info(url=f"https://www.youtube.com/watch?v={video_id}")
- if info:
- for item in info.get("formats", []):
- if item.get("format_id") == format_id:
- url = item.get("url")
- response = requests.get(url, stream=True)
- logging.info(f"videoId: {video_id} status code: {response.status_code}")
- def generate():
- for chunk in response.iter_content(chunk_size=1024):
- yield chunk
- return Response(generate(), status=response.status_code, headers=dict(response.headers))
- return {"status": 0}
- def convert_dto(info):
- thumbnails = []
- for item in info.get("thumbnails", []):
- if item.get("width"):
- thumbnails.append({
- "url": item.get("url", ""),
- "width": f"{item.get('width', 0)}",
- "height": f"{item.get('height', 0)}",
- })
- formats = []
- for item in info.get("formats", []):
- if item.get("resolution") != "audio only" and item.get("url") and item.get("acodec") and item.get(
- "acodec") != "none" and item.get("vcodec"):
- format_id = item.get("format_id")
- if format_id:
- url = f"http://d1boedwd1gi87x.cloudfront.net/p?id={info['display_id']}&fId={format_id}&videoId={info['display_id']}"
- formats.append({
- "width": f"{item.get('width', 0)}",
- "height": f"{item.get('height', 0)}",
- "type": item.get("format", ""),
- "quality": f'{item.get("format_note", "")}',
- "itag": 0,
- "fps": "0",
- "bitrate": "0",
- "url": url,
- "ext": item.get("ext"),
- "vcodec": item.get("vcodec", ""),
- "acodec": item.get("acodec", ""),
- "vbr": "0",
- "abr": "0",
- "container": item.get("container")
- })
- result = {
- "code": 200,
- "msg": "",
- "data": {
- "videoDetails": {
- "isLiveContent": info.get("is_live", False),
- "title": info.get("title", ""),
- "thumbnails": thumbnails,
- "description": info.get("description", ""),
- "lengthSeconds": f"{int(info.get('duration', 0) / 100)}",
- "viewCount": f"{info.get('view_count', 0)}",
- "keywords": [],
- "author": info.get("uploader", info.get("channel", "")),
- "channelID": info.get("channel_id", ""),
- "recommendInfo": [],
- "channelURL": info.get("channel_url", ""),
- "videoId": info.get("display_id", "")
- },
- "streamingData": {
- "formats": formats
- }
- },
- "id": "MusicDetailViewModel_detail_url"
- }
- return result
- @app.route("/extract", methods=["GET", "POST"])
- def extract():
- url: str = request.json.get("url")
- logging.info(f"url: {url}")
- info = fetch_info(url=url)
- return convert_dto(info=info)
- @app.route("/health", methods=["GET"])
- def health():
- return {
- "status": 1
- }
- @app.route("/refresh", methods=["GET"])
- def refresh():
- autoscaling = boto3.client(
- 'autoscaling',
- region_name="us-east-1",
- aws_access_key_id="AKIA2TBT2JUNG6X3W737",
- aws_secret_access_key="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")
- response = autoscaling.start_instance_refresh(
- AutoScalingGroupName="be-ytb-as")
- return {
- "response": response
- }
- if __name__ == '__main__':
- app.run(host='0.0.0.0', port=80, debug=True)
|