利用しているVPSへのssh試行が日々logwatchに報告され狼少年気味です。
そこで以下をなぞりnftables向けに日本と米国のホワイトリストを作り、sshに関しては他のアクセスを遮断しました。
その際にAPNICのレジストリデータを利用し、他のRIRからもデータを取得すれば攻撃試行IPアドレスの情報を照会できるだろうと考えました。
そしてその旨ChatGPTに相談してみるとノーコードで完成。
#!/usr/bin/env python3
"""
rir_lookup_multi.py – Look up which RIR allocation multiple IP addresses belong to.
変更点
* コマンドラインで **IPv4/IPv6 アドレスを任意個 (1 以上) 渡せる** (`nargs="+"`)
* 各 IP の結果を順番に表示。無効な IP やヒットしない IP は個別にメッセージ
* 既存の --all-status / --no-cache オプションはそのまま利用可能
"""
import argparse
import ipaddress
import sys
import time
import urllib.request
from pathlib import Path
from typing import List, Optional
# ------------------------------------------------------------
# キャッシュディレクトリ
# ------------------------------------------------------------
CACHE_DIR = Path(__file__).with_suffix("") # スクリプト名 → ディレクトリ名
CACHE_DIR = CACHE_DIR.parent / (CACHE_DIR.name + "_cache")
CACHE_DIR.mkdir(exist_ok=True)
# ------------------------------------------------------------
# RIR ダウンロード URL(HTTPS)
# ------------------------------------------------------------
RIR_URLS = {
"afrinic": "https://ftp.afrinic.net/pub/stats/afrinic/delegated-afrinic-latest",
"apnic": "https://ftp.apnic.net/stats/apnic/delegated-apnic-latest",
# ARIN は標準版が無いので extended(先頭 7 列のみ使用)
"arin": "https://ftp.arin.net/pub/stats/arin/delegated-arin-extended-latest",
"lacnic": "https://ftp.lacnic.net/pub/stats/lacnic/delegated-lacnic-latest",
"ripencc": "https://ftp.ripe.net/pub/stats/ripencc/delegated-ripencc-latest",
}
# ------------------------------------------------------------
# ユーティリティ関数
# ------------------------------------------------------------
def ip_to_int(ip: str) -> int:
"""IPv4 文字列 → 32bit 整数"""
return int(ipaddress.IPv4Address(ip))
def inside_ipv4_block(ip_int: int, start_ip: str, count: int) -> bool:
start = ip_to_int(start_ip)
return start <= ip_int <= start + count - 1
def inside_ipv6_block(addr: ipaddress.IPv6Address, start_ip: str, prefixlen: int) -> bool:
return addr in ipaddress.IPv6Network(f"{start_ip}/{prefixlen}", strict=False)
def ensure_file(rir: str, url: str, max_age_days: int = 7) -> Path:
"""RIR ファイルを HTTPS でダウンロードし、キャッシュを返す"""
dst = CACHE_DIR / f"delegated-{rir}-latest"
needs_dl = (
not dst.exists()
or max_age_days == 0
or (time.time() - dst.stat().st_mtime > max_age_days * 86400)
)
if needs_dl:
print(f"[INFO] Downloading {rir.upper()} data via HTTPS …", file=sys.stderr)
try:
urllib.request.urlretrieve(url, dst)
except Exception as e:
sys.exit(f"[ERROR] {rir.upper()}: download failed – {e}")
return dst
def lookup_ip(ip_str: str, files: List[Path], statuses: Optional[List[str]]):
"""最初にヒットした行を返す (None なら未ヒット)"""
is_v4 = ":" not in ip_str
target_v4 = ip_to_int(ip_str) if is_v4 else None
target_v6 = ipaddress.IPv6Address(ip_str) if not is_v4 else None
for fpath in files:
with fpath.open(encoding="utf-8", errors="replace") as f:
for line in f:
if "|summary|" in line or line.startswith("#"):
continue
parts = line.rstrip().split("|")
if len(parts) < 7:
continue
registry, cc, rtype, start, value, date, status = parts[:7]
if statuses and status not in statuses:
continue
if is_v4 and rtype == "ipv4":
if inside_ipv4_block(target_v4, start, int(value)):
return [fpath.stem.split("-")[1]] + parts
elif (not is_v4) and rtype == "ipv6":
if inside_ipv6_block(target_v6, start, int(value)):
return [fpath.stem.split("-")[1]] + parts
return None
# ------------------------------------------------------------
# メイン
# ------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(
description="Look up IP allocation across all RIRs (multiple IP support)."
)
parser.add_argument(
"ips",
nargs="+",
help="One or more IPv4/IPv6 addresses to look up",
)
parser.add_argument(
"-a",
"--all-status",
action="store_true",
help="Include assigned / reserved etc., not only allocated",
)
parser.add_argument(
"--no-cache",
action="store_true",
help="Force re-download of RIR files (ignore 7‑day cache)",
)
args = parser.parse_args()
# ----- RIR ファイル取得 -----
files: List[Path] = []
for rir, url in RIR_URLS.items():
files.append(ensure_file(rir, url, 0 if args.no_cache else 7))
statuses = None if args.all_status else ["allocated"]
# ----- 各 IP を検索 -----
for ip_str in args.ips:
print(f"\n>>> {ip_str}")
try:
result = lookup_ip(ip_str, files, statuses)
except ipaddress.AddressValueError:
print("[ERROR] Invalid IP address")
continue
if result:
rir, registry, cc, rtype, start, value, date, status = result[:8]
if rtype == "ipv4":
end_ip = ipaddress.IPv4Address(ip_to_int(start) + int(value) - 1)
rng = f"{start} – {end_ip}"
else:
rng = f"{start}/{value}"
print(f" RIR : {rir.upper()}")
print(f" Country code : {cc}")
print(f" Resource type : {rtype}")
print(f" Range : {rng}")
print(f" Allocation date: {date}")
print(f" Status : {status}")
else:
print("[INFO] No matching record found")
if __name__ == "__main__":
main()
データは日々更新されるとありますがキャッシュしてアクセスを減らす要望にも応えてくれました。
後から引数で複数のIPアドレスを指定できるように変更も何のその。
処理対象と要求が明確だとシゴデキなChatGPTさんステキ!
参考にした処理をPythonで単一ファイルにまとめるのもやってくれそうですね。