import logging import boto3 import requests import yt_dlp from flask import Flask, request, Response logging.basicConfig(format='%(asctime)s [%(levelname)s] %(filename)s:%(lineno)d %(message)s', datefmt='%Y-%m-%d %H:%M:%S', level=logging.INFO) app = Flask(__name__) def get_key(url: str) -> str: return f"v2:{url}" def fetch_info(url): logging.info(f"fetching: {url}") with yt_dlp.YoutubeDL({ "flat-playlist": True, "extract_flat": "flat-playlist" }) as ydl: info = ydl.extract_info(url, download=False) return info @app.route("/p", methods=["GET", "POST"]) def p(): video_id = request.args.get("id") format_id = request.args.get("fId") if video_id and format_id: info = fetch_info(url=f"https://www.youtube.com/watch?v={video_id}") if info: for item in info.get("formats", []): if item.get("format_id") == format_id: url = item.get("url") response = requests.get(url, stream=True) logging.info(f"videoId: {video_id} status code: {response.status_code}") def generate(): for chunk in response.iter_content(chunk_size=1024): yield chunk return Response(generate(), status=response.status_code, headers=dict(response.headers)) return {"status": 0} def convert_dto(info): thumbnails = [] for item in info.get("thumbnails", []): if item.get("width"): thumbnails.append({ "url": item.get("url", ""), "width": f"{item.get('width', 0)}", "height": f"{item.get('height', 0)}", }) formats = [] for item in info.get("formats", []): if item.get("resolution") != "audio only" and item.get("url") and item.get("acodec") and item.get( "acodec") != "none" and item.get("vcodec"): format_id = item.get("format_id") if format_id: url = f"http://d1boedwd1gi87x.cloudfront.net/p?id={info['display_id']}&fId={format_id}&videoId={info['display_id']}" formats.append({ "width": f"{item.get('width', 0)}", "height": f"{item.get('height', 0)}", "type": item.get("format", ""), "quality": f'{item.get("format_note", "")}', "itag": 0, "fps": "0", "bitrate": "0", "url": url, "ext": item.get("ext"), "vcodec": item.get("vcodec", ""), "acodec": item.get("acodec", ""), "vbr": "0", "abr": "0", "container": item.get("container") }) result = { "code": 200, "msg": "", "data": { "videoDetails": { "isLiveContent": info.get("is_live", False), "title": info.get("title", ""), "thumbnails": thumbnails, "description": info.get("description", ""), "lengthSeconds": f"{int(info.get('duration', 0) / 100)}", "viewCount": f"{info.get('view_count', 0)}", "keywords": [], "author": info.get("uploader", info.get("channel", "")), "channelID": info.get("channel_id", ""), "recommendInfo": [], "channelURL": info.get("channel_url", ""), "videoId": info.get("display_id", "") }, "streamingData": { "formats": formats } }, "id": "MusicDetailViewModel_detail_url" } return result @app.route("/extract", methods=["GET", "POST"]) def extract(): url: str = request.json.get("url") logging.info(f"url: {url}") info = fetch_info(url=url) return convert_dto(info=info) @app.route("/health", methods=["GET"]) def health(): return { "status": 1 } @app.route("/refresh", methods=["GET"]) def refresh(): autoscaling = boto3.client( 'autoscaling', region_name="us-east-1", aws_access_key_id="AKIA2TBT2JUNG6X3W737", aws_secret_access_key="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") response = autoscaling.start_instance_refresh( AutoScalingGroupName="be-ytb-as") return { "response": response } if __name__ == '__main__': app.run(host='0.0.0.0', port=80, debug=True)