Create get_tag_proxy.py

master
wangsiyuan 2024-10-01 00:04:09 +08:00
parent f102688e9f
commit ce768a47e6
1 changed files with 161 additions and 0 deletions

161
get_tag_proxy.py Normal file
View File

@ -0,0 +1,161 @@
import base64
import urllib
import json
import requests
def base64_decode(encoded_str):
"""
Base64 编码的字符串进行解码并处理填充问题
:param encoded_str: 要解码的 Base64 编码字符串
:return: 解码后的字符串
"""
# 补足缺失的填充字符
padded_str = encoded_str + '=' * (-len(encoded_str) % 4)
# 将字符串转换为字节串
bytes_str = padded_str.encode('utf-8')
# Base64 解码
decoded_bytes = base64.b64decode(bytes_str)
# 尝试将字节串转换为 UTF-8 字符串
try:
return decoded_bytes.decode('utf-8')
except UnicodeDecodeError:
# 如果解码失败,返回原始字节串
return decoded_bytes
def encode_to_base64(input_string):
"""
对字符串进行 Base64 编码
:param input_string: 要编码的字符串
:return: Base64 编码后的字符串
"""
# Convert the input string to bytes
input_bytes = input_string.encode('utf-8')
# Encode the bytes to Base64
encoded_bytes = base64.b64encode(input_bytes)
# Convert the Base64 bytes back to string
encoded_string = encoded_bytes.decode('utf-8')
return encoded_string
def get_subscription(url):
"""获取订阅数据"""
response = requests.get(url)
if response.status_code == 200:
return response.text
else:
raise Exception("无法获取订阅数据: HTTP 状态码 {}".format(response.status_code))
def parse_subscription(subscription):
"""解析订阅数据"""
decoded_data = base64_decode(subscription)
proxy_list = decoded_data.split('\n')
return [proxy for proxy in proxy_list if proxy]
def url_decode(encoded_str):
"""
URL 编码的字符串进行解码
:param encoded_str: 要解码的 URL 编码字符串
:return: 解码后的字符串
"""
return urllib.parse.unquote(encoded_str)
def parse_ss_urls(ss_url_list):
"""解析 Shadowsocks 链接列表"""
ss_proxies = []
for ss_proxy in ss_url_list:
# 提取各项信息
base_str = ss_proxy[5:] # 移除 'ss://'
encryption_mode_and_password = base64_decode(base_str.split("@")[0])
encryption_mode, password = encryption_mode_and_password.split(":")
server, port = base_str.split("@")[1].split("/?")[0].split(":")
plugin_str = url_decode(base_str.split("/?")[1].split("#")[0])
plugin_params = {}
if "plugin" in plugin_str:
plugin_parts = plugin_str.split(";")
for part in plugin_parts:
if "=" in part:
key, value = part.split("=")
plugin_params[key] = value
name = url_decode(base_str.split("#")[1].rstrip("\r"))
# 存储信息
ss_proxies.append({
"server": server,
"port": port,
"plugin": plugin_params.get('plugin', ''),
"obfs": plugin_params.get('obfs', ''),
"obfs-host": plugin_params.get('obfs-host', ''),
"name": name,
"encryption_mode": encryption_mode,
"password": password
})
return ss_proxies
def generate_ss_url(proxy_info):
"""
根据代理信息生成标准的 Shadowsocks 链接
"""
# 组合加密方式和密码并进行 Base64 编码
auth_str = f"{proxy_info['encryption_mode']}:{proxy_info['password']}"
base64_auth_str = encode_to_base64(auth_str)
# 构造插件参数
plugin_str = ""
if proxy_info['plugin']:
plugin_str = f"plugin={proxy_info['plugin']};obfs={proxy_info['obfs']};obfs-host={proxy_info['obfs-host']}"
# 构造完整的 ss 链接
ss_url = f"ss://{base64_auth_str}@{proxy_info['server']}:{proxy_info['port']}/?{plugin_str}#{proxy_info['name']}"
return ss_url
def save_to_file(ss_urls, file_name):
"""
将生成的 ss 链接保存到文件
"""
with open(file_name, 'w', encoding='utf-8') as file:
for ss_url in ss_urls:
file.write(f"{ss_url}\n")
print(f"Shadowsocks 链接已保存到 {file_name}")
def extract_ss_link(url):
"""
从订阅链接中提取 Shadowsocks 链接并保存到文件
"""
# 获取并解析订阅数据
subscription_data = get_subscription(url)
ss_url_list = parse_subscription(subscription_data)
# 解析 ss 链接,并生成标准格式的 ss 链接
ss_proxies = parse_ss_urls(ss_url_list)
formatted_ss_urls = [generate_ss_url(proxy) for proxy in ss_proxies]
# 保存到文件
save_to_file(formatted_ss_urls, 'ss_links.txt')
if __name__ == '__main__':
# 订阅地址
url = "https://link.nobodys.uk/api/v1/client/subscribe?token=26d553424da6d84dccb20dd05dc844c0"
# 提取并保存 SS 链接
extract_ss_link(url)