<BatBin/>

1#
2# Copyright (C) 2021-2022 by TheAloneteam@Github, < https://github.com/TheAloneTeam >.
3#
4# This file is part of < https://github.com/TheAloneTeam/AloneMusic > project,
5# and is released under the "GNU v3.0 License Agreement".
6# Please see < https://github.com/TheAloneTeam/AloneMusic/blob/master/LICENSE >
7#
8# All rights reserved.
9
10import asyncio
11import os
12import re
13from typing import Union
14import httpx
15import urllib.parse
16
17from pyrogram.enums import MessageEntityType
18from pyrogram.types import Message
19
20from AloneMusic import LOGGER
21
22# Use environment variables for configuration
23API_URL = os.getenv("API_URL", "http://149.28.138.220:8080").rstrip("/")
24API_KEY = os.getenv("API_KEY", "")
25
26async def download_assistant(query: str, dl_type: str) -> str:
27 """Helper to get stream URL from the API"""
28 safe_query = urllib.parse.quote(query)
29 url = f"{API_URL}/download?query={safe_query}&dl_type={dl_type}"
30 if API_KEY:
31 url += f"&api_key={API_KEY}"
32 return url
33
34async def download_song(link: str) -> str:
35 return await download_assistant(link, "audio")
36
37async def download_video(link: str) -> str:
38 return await download_assistant(link, "video")
39
40
41class YouTubeAPI:
42 def __init__(self):
43 self.base = "https://www.youtube.com/watch?v="
44 self.regex = r"(?:youtube\.com|youtu\.be)"
45 self.status = "https://www.youtube.com/oembed?url="
46 self.listbase = "https://youtube.com/playlist?list="
47 self.reg = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
48 self._client = None
49
50 async def get_client(self):
51 if self._client is None or self._client.is_closed:
52 self._client = httpx.AsyncClient(timeout=httpx.Timeout(600.0, connect=10.0))
53 return self._client
54
55 async def exists(self, link: str, videoid: Union[bool, str] = None):
56 if videoid:
57 link = self.base + link
58 return bool(re.search(self.regex, link))
59
60 async def url(self, message_1: Message) -> Union[str, None]:
61 messages = [message_1]
62 if message_1.reply_to_message:
63 messages.append(message_1.reply_to_message)
64 for message in messages:
65 if message.entities:
66 for entity in message.entities:
67 if entity.type == MessageEntityType.URL:
68 text = message.text or message.caption
69 return text[entity.offset : entity.offset + entity.length]
70 elif message.caption_entities:
71 for entity in message.caption_entities:
72 if entity.type == MessageEntityType.TEXT_LINK:
73 return entity.url
74 return None
75
76 def _clean_link(self, link: str):
77 if not link:
78 return ""
79 link = str(link)
80 if "&" in link:
81 link = link.split("&")[0]
82 if "?si=" in link:
83 link = link.split("?si=")[0]
84 elif "&si=" in link:
85 link = link.split("&si=")[0]
86 return link
87
88 async def _fetch_details(self, link: str):
89 link = self._clean_link(link)
90 client = await self.get_client()
91 params = {"link": link}
92 if API_KEY:
93 params["api_key"] = API_KEY
94 try:
95 response = await client.get(f"{API_URL}/details", params=params)
96 if response.status_code == 200:
97 return response.json()
98 else:
99 LOGGER(__name__).error(f"API Error ({response.status_code}): {response.text}")
100 except Exception as e:
101 LOGGER(__name__).error(f"Error fetching details from API: {e}")
102 return None
103
104 async def details(self, link: str, videoid: Union[bool, str] = None):
105 if videoid:
106 link = self.base + link
107 data = await self._fetch_details(link)
108 if data:
109 return data["title"], data["duration_min"], data["duration_sec"], data["thumbnail"], data["vidid"]
110 return None, None, 0, None, None
111
112 async def title(self, link: str, videoid: Union[bool, str] = None):
113 if videoid:
114 link = self.base + link
115 data = await self._fetch_details(link)
116 return data["title"] if data else None
117
118 async def duration(self, link: str, videoid: Union[bool, str] = None):
119 if videoid:
120 link = self.base + link
121 data = await self._fetch_details(link)
122 return data["duration_min"] if data else None
123
124 async def thumbnail(self, link: str, videoid: Union[bool, str] = None):
125 if videoid:
126 link = self.base + link
127 data = await self._fetch_details(link)
128 return data["thumbnail"] if data else None
129
130 async def video(self, link: str, videoid: Union[bool, str] = None):
131 if videoid:
132 link = self.base + link
133 try:
134 stream_url, status = await self.download(link, None, video=True)
135 if status:
136 return 1, stream_url
137 else:
138 return 0, "Video URL generation failed"
139 except Exception as e:
140 return 0, f"Video URL generation error: {e}"
141
142 async def playlist(self, link, limit, user_id, videoid: Union[bool, str] = None):
143 if videoid:
144 link = self.listbase + link
145 link = self._clean_link(link)
146
147 client = await self.get_client()
148 params = {"link": link, "limit": limit}
149 if API_KEY:
150 params["api_key"] = API_KEY
151 try:
152 response = await client.get(f"{API_URL}/playlist", params=params)
153 if response.status_code == 200:
154 data = response.json()
155 return data.get("videos")
156 else:
157 LOGGER(__name__).error(f"API Playlist Error ({response.status_code}): {response.text}")
158 except Exception as e:
159 LOGGER(__name__).error(f"Error fetching playlist from API: {e}")
160 return None
161
162 async def track(self, link: str, videoid: Union[bool, str] = None):
163 if videoid:
164 link = self.base + link
165 data = await self._fetch_details(link)
166 if data:
167 track_details = {
168 "title": data["title"],
169 "link": data["link"],
170 "vidid": data["vidid"],
171 "duration_min": data["duration_min"],
172 "thumb": data["thumbnail"],
173 }
174 return track_details, data["vidid"]
175 return None, None
176
177 async def slider(self, link: str, query_type: int, videoid: Union[bool, str] = None):
178 if videoid:
179 link = self.base + link
180 link = self._clean_link(link)
181 client = await self.get_client()
182 params = {"query": link, "limit": 10}
183 if API_KEY:
184 params["api_key"] = API_KEY
185 try:
186 response = await client.get(f"{API_URL}/search", params=params)
187 if response.status_code == 200:
188 result_data = response.json()
189 result = result_data.get("result")
190 if result and len(result) > query_type:
191 res = result[query_type]
192 return res["title"], res["duration"], res["thumbnails"][0]["url"].split("?")[0], res["id"]
193 else:
194 LOGGER(__name__).error(f"API Search Error ({response.status_code}): {response.text}")
195 except Exception as e:
196 LOGGER(__name__).error(f"Error in slider/search from API: {e}")
197 return None, None, None, None
198
199 async def download(
200 self,
201 link: str,
202 mystic,
203 video: Union[bool, str] = None,
204 videoid: Union[bool, str] = None,
205 songaudio: Union[bool, str] = None,
206 songvideo: Union[bool, str] = None,
207 format_id: Union[bool, str] = None,
208 title: Union[bool, str] = None,
209 ) -> tuple:
210 if videoid:
211 link = self.base + link
212
213 dl_type = "video" if (video or songvideo) else "audio"
214 link = self._clean_link(link)
215
216 # Check if we can extract a vidid to use the optimized stream URL
217 # regex for vidid
218 regex = r"(?:youtube\.com\/(?:[^\/]+\/.+\/|(?:v|e(?:mbed)?)\/|.*[?&]v=)|youtu\.be\/)([^\"&?\/\s]{11})"
219 match = re.search(regex, link)
220 vidid_extracted = match.group(1) if match else None
221
222 if vidid_extracted:
223 ext = "mp4" if dl_type == "video" else "mp3"
224 # Using the masked /stream/youtube.com endpoint helps the player identify it
225 # as a YouTube source and enables speed control/seeking via Range requests
226 stream_url = f"{API_URL}/stream/youtube.com/{vidid_extracted}.{ext}"
227 if API_KEY:
228 stream_url += f"?api_key={API_KEY}"
229
230 # We trigger a details call to ensure the API starts caching the file locally
231 asyncio.create_task(self.prefetch(link, video=bool(dl_type == "video")))
232 else:
233 safe_link = urllib.parse.quote(link)
234 stream_url = f"{API_URL}/download?query={safe_link}&dl_type={dl_type}"
235 if API_KEY:
236 stream_url += f"&api_key={API_KEY}"
237
238 return stream_url, True
239
240 async def prefetch(self, link: str, video: bool = False):
241 """Triggers background pre-fetching on the API"""
242 dl_type = "video" if video else "audio"
243 link = self._clean_link(link)
244 client = await self.get_client()
245 params = {"query": link, "dl_type": dl_type, "prefetch": "true"}
246 if API_KEY:
247 params["api_key"] = API_KEY
248 try:
249 # Fire and forget request to the API
250 await client.get(f"{API_URL}/download", params=params)
251 return True
252 except Exception as e:
253 LOGGER(__name__).error(f"Prefetch failed for {link}: {e}")
254 return False
255
256 async def formats(self, link: str, videoid: Union[bool, str] = None):
257 if videoid:
258 link = self.base + link
259 link = self._clean_link(link)
260 client = await self.get_client()
261 params = {"link": link}
262 if API_KEY:
263 params["api_key"] = API_KEY
264 try:
265 response = await client.get(f"{API_URL}/formats", params=params)
266 if response.status_code == 200:
267 data = response.json()
268 formats = data.get("formats", [])
269 for f in formats:
270 f["yturl"] = link
271 return formats, link
272 else:
273 LOGGER(__name__).error(f"API Formats Error ({response.status_code}): {response.text}")
274 except Exception as e:
275 LOGGER(__name__).error(f"Error fetching formats from API: {e}")
276 return [], link
277
278 async def close(self):
279 if self._client and not self._client.is_closed:
280 await self._client.aclose()
281