import logging import cachetools import yt_dlp from flask import Flask, request logging.basicConfig(format='%(asctime)s [%(levelname)s] %(filename)s:%(lineno)d %(message)s', datefmt='%Y-%m-%d %H:%M:%S', level=logging.INFO) app = Flask(__name__) def get_key(url: str) -> str: return f"v2:{url}" @cachetools.cached(cache=cachetools.TTLCache(maxsize=100000, ttl=60 * 5)) def fetch_info(url): logging.info(f"fetching: {url}") with yt_dlp.YoutubeDL({ "flat-playlist": True, "extract_flat": "flat-playlist", "proxy": "socks5-proxy-69759aa5e4d9b48a.elb.us-east-1.amazonaws.com" }) as ydl: info = ydl.extract_info(url, download=False) return info def convert_dto(info): thumbnails = [] for item in info.get("thumbnails", []): if item.get("width"): thumbnails.append({ "url": item.get("url", ""), "width": f"{item.get('width', 0)}", "height": f"{item.get('height', 0)}", }) formats = [] for item in info.get("formats", []): if item.get("resolution") != "audio only" and item.get("url") and item.get("acodec") and item.get( "acodec") != "none" and item.get("vcodec"): formats.append({ "width": f"{item.get('width', 0)}", "height": f"{item.get('height', 0)}", "type": item.get("format", ""), "quality": f'{item.get("format_note", "")}', "itag": 0, "fps": "0", "bitrate": "0", "url": item.get("url", ""), "ext": item.get("ext"), "vcodec": item.get("vcodec", ""), "acodec": item.get("acodec", ""), "vbr": "0", "abr": "0", "container": item.get("container") }) result = { "code": 200, "msg": "", "data": { "videoDetails": { "isLiveContent": info.get("is_live", False), "title": info.get("title", ""), "thumbnails": thumbnails, "description": info.get("description", ""), "lengthSeconds": f"{int(info.get('duration', 0) / 100)}", "viewCount": f"{info.get('view_count', 0)}", "keywords": [], "author": info.get("uploader", info.get("channel", "")), "channelID": info.get("channel_id", ""), "recommendInfo": [], "channelURL": info.get("channel_url", ""), "videoId": info.get("display_id", "") }, "streamingData": { "formats": formats } }, "id": "MusicDetailViewModel_detail_url" } return result @app.route("/extract", methods=["GET", "POST"]) def extract(): url: str = request.json.get("url") logging.info(f"url: {url}") info = fetch_info(url=url) return convert_dto(info=info) if __name__ == '__main__': app.run(host='0.0.0.0', port=80, debug=True)