app.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. import logging
  2. import cachetools
  3. import yt_dlp
  4. from flask import Flask, request
  5. logging.basicConfig(format='%(asctime)s [%(levelname)s] %(filename)s:%(lineno)d %(message)s',
  6. datefmt='%Y-%m-%d %H:%M:%S', level=logging.INFO)
  7. app = Flask(__name__)
  8. def get_key(url: str) -> str:
  9. return f"v2:{url}"
  10. @cachetools.cached(cache=cachetools.TTLCache(maxsize=100000, ttl=60 * 5))
  11. def fetch_info(url):
  12. logging.info(f"fetching: {url}")
  13. with yt_dlp.YoutubeDL({
  14. "flat-playlist": True,
  15. "extract_flat": "flat-playlist",
  16. "proxy": "socks5-proxy-69759aa5e4d9b48a.elb.us-east-1.amazonaws.com"
  17. }) as ydl:
  18. info = ydl.extract_info(url, download=False)
  19. return info
  20. def convert_dto(info):
  21. thumbnails = []
  22. for item in info.get("thumbnails", []):
  23. if item.get("width"):
  24. thumbnails.append({
  25. "url": item.get("url", ""),
  26. "width": f"{item.get('width', 0)}",
  27. "height": f"{item.get('height', 0)}",
  28. })
  29. formats = []
  30. for item in info.get("formats", []):
  31. if item.get("resolution") != "audio only" and item.get("url") and item.get("acodec") and item.get(
  32. "acodec") != "none" and item.get("vcodec"):
  33. formats.append({
  34. "width": f"{item.get('width', 0)}",
  35. "height": f"{item.get('height', 0)}",
  36. "type": item.get("format", ""),
  37. "quality": f'{item.get("format_note", "")}',
  38. "itag": 0,
  39. "fps": "0",
  40. "bitrate": "0",
  41. "url": item.get("url", ""),
  42. "ext": item.get("ext"),
  43. "vcodec": item.get("vcodec", ""),
  44. "acodec": item.get("acodec", ""),
  45. "vbr": "0",
  46. "abr": "0",
  47. "container": item.get("container")
  48. })
  49. result = {
  50. "code": 200,
  51. "msg": "",
  52. "data": {
  53. "videoDetails": {
  54. "isLiveContent": info.get("is_live", False),
  55. "title": info.get("title", ""),
  56. "thumbnails": thumbnails,
  57. "description": info.get("description", ""),
  58. "lengthSeconds": f"{int(info.get('duration', 0) / 100)}",
  59. "viewCount": f"{info.get('view_count', 0)}",
  60. "keywords": [],
  61. "author": info.get("uploader", info.get("channel", "")),
  62. "channelID": info.get("channel_id", ""),
  63. "recommendInfo": [],
  64. "channelURL": info.get("channel_url", ""),
  65. "videoId": info.get("display_id", "")
  66. },
  67. "streamingData": {
  68. "formats": formats
  69. }
  70. },
  71. "id": "MusicDetailViewModel_detail_url"
  72. }
  73. return result
  74. @app.route("/extract", methods=["GET", "POST"])
  75. def extract():
  76. url: str = request.json.get("url")
  77. logging.info(f"url: {url}")
  78. info = fetch_info(url=url)
  79. return convert_dto(info=info)
  80. if __name__ == '__main__':
  81. app.run(host='0.0.0.0', port=80, debug=True)