创建 r0capture.py
parent
649dd5ac89
commit
23640b607a
|
|
@ -0,0 +1,374 @@
|
|||
# Copyright 2017 Google Inc. All Rights Reserved.
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Decrypts and logs a process's SSL traffic.
|
||||
Hooks the functions SSL_read() and SSL_write() in a given process and logs the
|
||||
decrypted data to the console and/or to a pcap file.
|
||||
Typical usage example:
|
||||
ssl_log("wget", "log.pcap", True)
|
||||
Dependencies:
|
||||
frida (https://www.frida.re/):
|
||||
sudo pip install frida
|
||||
hexdump (https://bitbucket.org/techtonik/hexdump/) if using verbose output:
|
||||
sudo pip install hexdump
|
||||
"""
|
||||
|
||||
__author__ = "geffner@google.com (Jason Geffner)"
|
||||
__version__ = "2.0"
|
||||
|
||||
"""
|
||||
# r0capture
|
||||
|
||||
ID: r0ysue
|
||||
|
||||
安卓应用层抓包通杀脚本
|
||||
|
||||
https://github.com/r0ysue/r0capture
|
||||
|
||||
## 简介
|
||||
|
||||
- 仅限安卓平台,测试安卓7、8、9、10 可用 ;
|
||||
- 无视所有证书校验或绑定,无视任何证书;
|
||||
- 通杀TCP/IP四层模型中的应用层中的全部协议;
|
||||
- 通杀协议包括:Http,WebSocket,Ftp,Xmpp,Imap,Smtp,Protobuf等等、以及它们的SSL版本;
|
||||
- 通杀所有应用层框架,包括HttpUrlConnection、Okhttp1/3/4、Retrofit/Volley等等;
|
||||
"""
|
||||
|
||||
# Windows版本需要安装库:
|
||||
# pip install 'win_inet_pton'
|
||||
# pip install hexdump
|
||||
import argparse
|
||||
import os
|
||||
import pprint
|
||||
import random
|
||||
import signal
|
||||
import socket
|
||||
import struct
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import frida
|
||||
from loguru import logger
|
||||
|
||||
try:
|
||||
if os.name == 'nt':
|
||||
import win_inet_pton
|
||||
except ImportError:
|
||||
# win_inet_pton import error
|
||||
pass
|
||||
|
||||
try:
|
||||
import myhexdump as hexdump # pylint: disable=g-import-not-at-top
|
||||
except ImportError:
|
||||
pass
|
||||
try:
|
||||
from shutil import get_terminal_size as get_terminal_size
|
||||
except:
|
||||
try:
|
||||
from backports.shutil_get_terminal_size import get_terminal_size as get_terminal_size
|
||||
except:
|
||||
pass
|
||||
|
||||
try:
|
||||
import click
|
||||
except:
|
||||
class click:
|
||||
@staticmethod
|
||||
def secho(message=None, **kwargs):
|
||||
print(message)
|
||||
|
||||
@staticmethod
|
||||
def style(**kwargs):
|
||||
raise Exception("unsupported style")
|
||||
banner = """
|
||||
--------------------------------------------------------------------------------------------
|
||||
.oooo. .
|
||||
d8P'`Y8b .o8
|
||||
oooo d8b 888 888 .ooooo. .oooo. oo.ooooo. .o888oo oooo oooo oooo d8b .ooooo.
|
||||
`888""8P 888 888 d88' `"Y8 `P )88b 888' `88b 888 `888 `888 `888""8P d88' `88b
|
||||
888 888 888 888 .oP"888 888 888 888 888 888 888 888ooo888
|
||||
888 `88b d88' 888 .o8 d8( 888 888 888 888 . 888 888 888 888 .o
|
||||
d888b `Y8bd8P' `Y8bod8P' `Y888""8o 888bod8P' "888" `V88V"V8P' d888b `Y8bod8P'
|
||||
888
|
||||
o888o
|
||||
https://github.com/r0ysue/r0capture
|
||||
--------------------------------------------------------------------------------------------\n
|
||||
"""
|
||||
|
||||
|
||||
def show_banner():
|
||||
colors = ['bright_red', 'bright_green', 'bright_blue', 'cyan', 'magenta']
|
||||
try:
|
||||
click.style('color test', fg='bright_red')
|
||||
except:
|
||||
colors = ['red', 'green', 'blue', 'cyan', 'magenta']
|
||||
try:
|
||||
columns = get_terminal_size().columns
|
||||
if columns >= len(banner.splitlines()[1]):
|
||||
for line in banner.splitlines():
|
||||
click.secho(line, fg=random.choice(colors))
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
# ssl_session[<SSL_SESSION id>] = (<bytes sent by client>,
|
||||
# <bytes sent by server>)
|
||||
ssl_sessions = {}
|
||||
|
||||
|
||||
def ssl_log(process, pcap=None, host=False, verbose=False, isUsb=False, ssllib="", isSpawn=True, wait=0):
|
||||
"""Decrypts and logs a process's SSL traffic.
|
||||
Hooks the functions SSL_read() and SSL_write() in a given process and logs
|
||||
the decrypted data to the console and/or to a pcap file.
|
||||
Args:
|
||||
process: The target process's name (as a string) or process ID (as an int).
|
||||
pcap: The file path to which the pcap file should be written.
|
||||
verbose: If True, log the decrypted traffic to the console.
|
||||
Raises:
|
||||
NotImplementedError: Not running on a Linux or macOS system.
|
||||
"""
|
||||
|
||||
# if platform.system() not in ("Darwin", "Linux"):
|
||||
# raise NotImplementedError("This function is only implemented for Linux and "
|
||||
# "macOS systems.")
|
||||
|
||||
def log_pcap(pcap_file, ssl_session_id, function, src_addr, src_port,
|
||||
dst_addr, dst_port, data):
|
||||
"""Writes the captured data to a pcap file.
|
||||
Args:
|
||||
pcap_file: The opened pcap file.
|
||||
ssl_session_id: The SSL session ID for the communication.
|
||||
function: The function that was intercepted ("SSL_read" or "SSL_write").
|
||||
src_addr: The source address of the logged packet.
|
||||
src_port: The source port of the logged packet.
|
||||
dst_addr: The destination address of the logged packet.
|
||||
dst_port: The destination port of the logged packet.
|
||||
data: The decrypted packet data.
|
||||
"""
|
||||
t = time.time()
|
||||
|
||||
if ssl_session_id not in ssl_sessions:
|
||||
ssl_sessions[ssl_session_id] = (random.randint(0, 0xFFFFFFFF),
|
||||
random.randint(0, 0xFFFFFFFF))
|
||||
client_sent, server_sent = ssl_sessions[ssl_session_id]
|
||||
|
||||
if function == "SSL_read":
|
||||
seq, ack = (server_sent, client_sent)
|
||||
else:
|
||||
seq, ack = (client_sent, server_sent)
|
||||
|
||||
for writes in (
|
||||
# PCAP record (packet) header
|
||||
("=I", int(t)), # Timestamp seconds
|
||||
("=I", int((t * 1000000) % 1000000)), # Timestamp microseconds
|
||||
("=I", 40 + len(data)), # Number of octets saved
|
||||
("=i", 40 + len(data)), # Actual length of packet
|
||||
# IPv4 header
|
||||
(">B", 0x45), # Version and Header Length
|
||||
(">B", 0), # Type of Service
|
||||
(">H", 40 + len(data)), # Total Length
|
||||
(">H", 0), # Identification
|
||||
(">H", 0x4000), # Flags and Fragment Offset
|
||||
(">B", 0xFF), # Time to Live
|
||||
(">B", 6), # Protocol
|
||||
(">H", 0), # Header Checksum
|
||||
(">I", src_addr), # Source Address
|
||||
(">I", dst_addr), # Destination Address
|
||||
# TCP header
|
||||
(">H", src_port), # Source Port
|
||||
(">H", dst_port), # Destination Port
|
||||
(">I", seq), # Sequence Number
|
||||
(">I", ack), # Acknowledgment Number
|
||||
(">H", 0x5018), # Header Length and Flags
|
||||
(">H", 0xFFFF), # Window Size
|
||||
(">H", 0), # Checksum
|
||||
(">H", 0)): # Urgent Pointer
|
||||
pcap_file.write(struct.pack(writes[0], writes[1]))
|
||||
pcap_file.write(data)
|
||||
|
||||
if function == "SSL_read":
|
||||
server_sent += len(data)
|
||||
else:
|
||||
client_sent += len(data)
|
||||
ssl_sessions[ssl_session_id] = (client_sent, server_sent)
|
||||
|
||||
def on_message(message, data):
|
||||
"""Callback for errors and messages sent from Frida-injected JavaScript.
|
||||
Logs captured packet data received from JavaScript to the console and/or a
|
||||
pcap file. See https://www.frida.re/docs/messages/ for more detail on
|
||||
Frida's messages.
|
||||
Args:
|
||||
message: A dictionary containing the message "type" and other fields
|
||||
dependent on message type.
|
||||
data: The string of captured decrypted data.
|
||||
"""
|
||||
if message["type"] == "error":
|
||||
logger.info(f"{message}")
|
||||
os.kill(os.getpid(), signal.SIGTERM)
|
||||
return
|
||||
if len(data) == 1:
|
||||
logger.info(f'{message["payload"]["function"]}')
|
||||
logger.info(f'{message["payload"]["stack"]}')
|
||||
return
|
||||
p = message["payload"]
|
||||
if verbose:
|
||||
src_addr = socket.inet_ntop(socket.AF_INET,
|
||||
struct.pack(">I", p["src_addr"]))
|
||||
dst_addr = socket.inet_ntop(socket.AF_INET,
|
||||
struct.pack(">I", p["dst_addr"]))
|
||||
session_id = p['ssl_session_id']
|
||||
logger.info(f"SSL Session: {session_id}")
|
||||
logger.info("[%s] %s:%d --> %s:%d" % (
|
||||
p["function"],
|
||||
src_addr,
|
||||
p["src_port"],
|
||||
dst_addr,
|
||||
p["dst_port"]))
|
||||
gen = hexdump.hexdump(data, result="generator",only_str=True)
|
||||
str_gen = ''.join(gen)
|
||||
logger.info(f"{str_gen}")
|
||||
logger.info(f"{p['stack']}")
|
||||
if pcap:
|
||||
log_pcap(pcap_file, p["ssl_session_id"], p["function"], p["src_addr"],
|
||||
p["src_port"], p["dst_addr"], p["dst_port"], data)
|
||||
|
||||
if isUsb:
|
||||
try:
|
||||
device = frida.get_usb_device()
|
||||
except:
|
||||
device = frida.get_remote_device()
|
||||
else:
|
||||
if host:
|
||||
manager = frida.get_device_manager()
|
||||
device = manager.add_remote_device(host)
|
||||
else:
|
||||
device = frida.get_local_device()
|
||||
|
||||
if isSpawn:
|
||||
pid = device.spawn([process])
|
||||
time.sleep(1)
|
||||
session = device.attach(pid)
|
||||
time.sleep(1)
|
||||
device.resume(pid)
|
||||
else:
|
||||
print("attach")
|
||||
session = device.attach(process)
|
||||
if wait > 0:
|
||||
print(f"wait for {wait} seconds")
|
||||
time.sleep(wait)
|
||||
|
||||
# session = frida.attach(process)
|
||||
|
||||
# pid = device.spawn([process])
|
||||
# pid = process
|
||||
# session = device.attach(pid)
|
||||
# device.resume(pid)
|
||||
if pcap:
|
||||
pcap_file = open(pcap, "wb", 0)
|
||||
for writes in (
|
||||
("=I", 0xa1b2c3d4), # Magic number
|
||||
("=H", 2), # Major version number
|
||||
("=H", 4), # Minor version number
|
||||
("=i", time.timezone), # GMT to local correction
|
||||
("=I", 0), # Accuracy of timestamps
|
||||
("=I", 65535), # Max length of captured packets
|
||||
("=I", 228)): # Data link type (LINKTYPE_IPV4)
|
||||
pcap_file.write(struct.pack(writes[0], writes[1]))
|
||||
|
||||
with open(Path(__file__).resolve().parent.joinpath("./script.js"), encoding="utf-8") as f:
|
||||
_FRIDA_SCRIPT = f.read()
|
||||
# _FRIDA_SCRIPT = session.create_script(content)
|
||||
# print(_FRIDA_SCRIPT)
|
||||
script = session.create_script(_FRIDA_SCRIPT)
|
||||
script.on("message", on_message)
|
||||
script.load()
|
||||
|
||||
if ssllib != "":
|
||||
script.exports.setssllib(ssllib)
|
||||
|
||||
print("Press Ctrl+C to stop logging.")
|
||||
|
||||
def stoplog(signum, frame):
|
||||
print('You have stoped logging.')
|
||||
session.detach()
|
||||
if pcap:
|
||||
pcap_file.flush()
|
||||
pcap_file.close()
|
||||
exit()
|
||||
|
||||
signal.signal(signal.SIGINT, stoplog)
|
||||
signal.signal(signal.SIGTERM, stoplog)
|
||||
sys.stdin.read()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
show_banner()
|
||||
|
||||
|
||||
class ArgParser(argparse.ArgumentParser):
|
||||
|
||||
def error(self, message):
|
||||
print("ssl_logger v" + __version__)
|
||||
print("by " + __author__)
|
||||
print("Modified by BigFaceCat")
|
||||
print("Error: " + message)
|
||||
print()
|
||||
print(self.format_help().replace("usage:", "Usage:"))
|
||||
self.exit(0)
|
||||
|
||||
|
||||
parser = ArgParser(
|
||||
add_help=False,
|
||||
description="Decrypts and logs a process's SSL traffic.",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog=r"""
|
||||
Examples:
|
||||
%(prog)s -pcap ssl.pcap openssl
|
||||
%(prog)s -verbose 31337
|
||||
%(prog)s -pcap log.pcap -verbose wget
|
||||
%(prog)s -pcap log.pcap -ssl "*libssl.so*" com.bigfacecat.testdemo
|
||||
""")
|
||||
|
||||
args = parser.add_argument_group("Arguments")
|
||||
args.add_argument("-pcap", '-p', metavar="<path>", required=False,
|
||||
help="Name of PCAP file to write")
|
||||
args.add_argument("-host", '-H', metavar="<192.168.1.1:27042>", required=False,
|
||||
help="connect to remote frida-server on HOST")
|
||||
args.add_argument("-verbose", "-v", required=False, action="store_const", default=True,
|
||||
const=True, help="Show verbose output")
|
||||
args.add_argument("process", metavar="<process name | process id>",
|
||||
help="Process whose SSL calls to log")
|
||||
args.add_argument("-ssl", default="", metavar="<lib>",
|
||||
help="SSL library to hook")
|
||||
args.add_argument("--isUsb", "-U", default=False, action="store_true",
|
||||
help="connect to USB device")
|
||||
args.add_argument("--isSpawn", "-f", default=False, action="store_true",
|
||||
help="if spawned app")
|
||||
args.add_argument("-wait", "-w", type=int, metavar="<seconds>", default=0,
|
||||
help="Time to wait for the process")
|
||||
|
||||
parsed = parser.parse_args()
|
||||
logger.add(f"{parsed.process.replace('.','_')}-{int(time.time())}.log", rotation="500MB", encoding="utf-8", enqueue=True, retention="10 days")
|
||||
|
||||
ssl_log(
|
||||
int(parsed.process) if parsed.process.isdigit() else parsed.process,
|
||||
parsed.pcap,
|
||||
parsed.host,
|
||||
parsed.verbose,
|
||||
isUsb=parsed.isUsb,
|
||||
isSpawn=parsed.isSpawn,
|
||||
ssllib=parsed.ssl,
|
||||
wait=parsed.wait
|
||||
)
|
||||
Reference in New Issue