diff --git a/get_tag_proxy.py b/get_tag_proxy.py new file mode 100644 index 0000000..5a2eb88 --- /dev/null +++ b/get_tag_proxy.py @@ -0,0 +1,161 @@ +import base64 +import urllib +import json +import requests + + +def base64_decode(encoded_str): + """ + 对 Base64 编码的字符串进行解码,并处理填充问题 + + :param encoded_str: 要解码的 Base64 编码字符串 + :return: 解码后的字符串 + """ + # 补足缺失的填充字符 + padded_str = encoded_str + '=' * (-len(encoded_str) % 4) + + # 将字符串转换为字节串 + bytes_str = padded_str.encode('utf-8') + + # Base64 解码 + decoded_bytes = base64.b64decode(bytes_str) + + # 尝试将字节串转换为 UTF-8 字符串 + try: + return decoded_bytes.decode('utf-8') + except UnicodeDecodeError: + # 如果解码失败,返回原始字节串 + return decoded_bytes + + +def encode_to_base64(input_string): + """ + 对字符串进行 Base64 编码 + :param input_string: 要编码的字符串 + :return: Base64 编码后的字符串 + """ + # Convert the input string to bytes + input_bytes = input_string.encode('utf-8') + + # Encode the bytes to Base64 + encoded_bytes = base64.b64encode(input_bytes) + + # Convert the Base64 bytes back to string + encoded_string = encoded_bytes.decode('utf-8') + + return encoded_string + + +def get_subscription(url): + """获取订阅数据""" + response = requests.get(url) + if response.status_code == 200: + return response.text + else: + raise Exception("无法获取订阅数据: HTTP 状态码 {}".format(response.status_code)) + + +def parse_subscription(subscription): + """解析订阅数据""" + decoded_data = base64_decode(subscription) + proxy_list = decoded_data.split('\n') + return [proxy for proxy in proxy_list if proxy] + + +def url_decode(encoded_str): + """ + 对 URL 编码的字符串进行解码 + + :param encoded_str: 要解码的 URL 编码字符串 + :return: 解码后的字符串 + """ + return urllib.parse.unquote(encoded_str) + + +def parse_ss_urls(ss_url_list): + """解析 Shadowsocks 链接列表""" + ss_proxies = [] + for ss_proxy in ss_url_list: + # 提取各项信息 + base_str = ss_proxy[5:] # 移除 'ss://' + encryption_mode_and_password = base64_decode(base_str.split("@")[0]) + encryption_mode, password = encryption_mode_and_password.split(":") + server, port = base_str.split("@")[1].split("/?")[0].split(":") + plugin_str = url_decode(base_str.split("/?")[1].split("#")[0]) + + plugin_params = {} + if "plugin" in plugin_str: + plugin_parts = plugin_str.split(";") + for part in plugin_parts: + if "=" in part: + key, value = part.split("=") + plugin_params[key] = value + + name = url_decode(base_str.split("#")[1].rstrip("\r")) + + # 存储信息 + ss_proxies.append({ + "server": server, + "port": port, + "plugin": plugin_params.get('plugin', ''), + "obfs": plugin_params.get('obfs', ''), + "obfs-host": plugin_params.get('obfs-host', ''), + "name": name, + "encryption_mode": encryption_mode, + "password": password + }) + return ss_proxies + + +def generate_ss_url(proxy_info): + """ + 根据代理信息生成标准的 Shadowsocks 链接 + """ + # 组合加密方式和密码并进行 Base64 编码 + auth_str = f"{proxy_info['encryption_mode']}:{proxy_info['password']}" + base64_auth_str = encode_to_base64(auth_str) + + # 构造插件参数 + plugin_str = "" + if proxy_info['plugin']: + plugin_str = f"plugin={proxy_info['plugin']};obfs={proxy_info['obfs']};obfs-host={proxy_info['obfs-host']}" + + # 构造完整的 ss 链接 + ss_url = f"ss://{base64_auth_str}@{proxy_info['server']}:{proxy_info['port']}/?{plugin_str}#{proxy_info['name']}" + + return ss_url + + +def save_to_file(ss_urls, file_name): + """ + 将生成的 ss 链接保存到文件 + """ + with open(file_name, 'w', encoding='utf-8') as file: + for ss_url in ss_urls: + file.write(f"{ss_url}\n") + + print(f"Shadowsocks 链接已保存到 {file_name}") + + +def extract_ss_link(url): + """ + 从订阅链接中提取 Shadowsocks 链接,并保存到文件 + """ + # 获取并解析订阅数据 + subscription_data = get_subscription(url) + ss_url_list = parse_subscription(subscription_data) + + # 解析 ss 链接,并生成标准格式的 ss 链接 + ss_proxies = parse_ss_urls(ss_url_list) + formatted_ss_urls = [generate_ss_url(proxy) for proxy in ss_proxies] + + # 保存到文件 + save_to_file(formatted_ss_urls, 'ss_links.txt') + + +if __name__ == '__main__': + # 订阅地址 + url = "https://link.nobodys.uk/api/v1/client/subscribe?token=26d553424da6d84dccb20dd05dc844c0" + + # 提取并保存 SS 链接 + extract_ss_link(url)