本文最后更新于 518 天前,其中的信息可能已经有所发展或是发生改变。
Ethernet II
https://zh.wikipedia.org/wiki/%E4%BB%A5%E5%A4%AA%E7%BD%91%E5%B8%A7%E6%A0%BC%E5%BC%8F#Ethernet_II
- 以太网帧
Struct | Size(Bytes) | Usage | Example |
---|---|---|---|
Destination | 6 | 目的地址 | 33:33:00:01:00:03 |
Source | 6 | 源地址 | 00:50:56:c0:00:08 |
Type | 2 | 类型 | 0x8600 |
Type | Description |
---|---|
0x0800 | IPV4 |
0x0806 | ARP |
0x0835 | RARP |
0x86DD | IPV6 |
IPv4
Internet Protocol
https://zh.wikipedia.org/zh-hans/IPv4
Struct | Size(bits) | Description | Example |
---|---|---|---|
Version | 4 | IP版本(IPv4必为4) | 0100 |
Header Length | 4 | 头部32位长个数(最小结构为5) | 0101 |
Differentiated Service | 6 | 区分服务(实时数据流时使用) | 000000 |
Explicit Congestion Notification | 2 | 显式阻塞通告 | 00 |
Total Length | 16 | 全长(首部+数据) | 0x003c |
Identification | 16 | 识别码(全局,自动加一) | 0xc181 |
Flags | 3 | 控制识别分片 | 000 |
Fragment Offset | 13 | 每个分片相对于原始报文开头的偏移量,以8字节作单位 | 0000000000000 |
Time to Live | 8 | 存活时长(经过路由器减一) | 0x80 |
Protocol | 8 | 协议类型 | 0x01 |
Header Checksum | 16 | 头部校验和 | 0x4769 |
Source Address | 32 | 源地址 | 0xc0a85801 |
Destinstaion | 32 | 目的地址 | 0xc0a85884 |
Index | Description |
---|---|
0 | 保留,必须为0 |
1 | 为1时禁止分片,为0时允许分片 |
2 | 为1时后续还有分片,为0时为最后一个分片 |
Protocol详情:https://zh.wikipedia.org/zh-hans/IP%E5%8D%8F%E8%AE%AE%E5%8F%B7%E5%88%97%E8%A1%A8
- 手动构造
def checksum(data: bytes) -> bytes:
data = data.hex()
c = 0
for i in range(len(data) // 4):
c += int(data[i * 4: (i + 1) * 4], 16)
c = (c & 0xffff) + 1 if c > 0xffff else c
c = hex(~c & 0xffff)[2:]
c = c.rjust((len(c) // 2 + 1) * 2, "0") if len(c) % 2 else c
return bytes.fromhex(c)
something = b"" # such as icmp
ip = b""
ip += b"\x45" # version + header length
ip += b"\x00" # differentiated services
ip += b"\x00\x3c" # total length
ip += b"\xc1\x81" # identification
ip += b"\x00\x00" # flags + fragment offset
ip += b"\x80" # ttl
ip += b"\x01" # protocol
ip += b"\x00\x00" # header checksum
ip += bytes.fromhex(''.join([hex(int(i))[2:].rjust(2, '0') for i in "192.168.88.1".split(".")])) # source
ip += bytes.fromhex(''.join([hex(int(i))[2:].rjust(2, '0') for i in "192.168.88.132".split(".")])) # destination
ip += something # such as icmp
ip = ip[:3] + bytes.fromhex(hex(len(ip))[2:].rjust(4, "0")) + ip[5:] # reset total length
ip = ip[:10] + checksum(ip[:20]) + ip[12:] # calc header checksum
print(ip)
- 分块构造
from tools import send, checksum
def ether():
e = b""
e += bytes.fromhex("000c29e57f05") # destination mac
e += bytes.fromhex("005056c00008") # source mac
e += b"\x08\x00" # type
return e
def icmp(data: bytes):
i = b""
i += b"\x08" # type
i += b"\x00" # code
i += b"\x00\x00" # checksum
i += b"\x12\x34" # identifier
i += b"\x56\x78" # sequence number
i += data # data
i = i[:2] + checksum(i) + i[4:]
return i
def ipv4(data: bytes):
blocks = [data[i * 1480: (i + 1) * 1480] for i in range(len(data) // 1480 + 1)]
blocks = blocks if blocks[-1] else blocks[:-1]
for i in range(len(blocks)):
ip = b""
ip += b"\x45" # version + header length
ip += b"\x00" # differentiated services
ip += b"\x00\x3c" # total length
ip += b"\x00\x00" # identification
if i == len(blocks) - 1:
ip += bytes.fromhex(hex(int("000" + bin(i * 1480 // 8)[2:].rjust(13, "0"), 2))[2:].rjust(4, "0"))
else:
ip += bytes.fromhex(hex(int("001" + bin(i * 1480 // 8)[2:].rjust(13, "0"), 2))[2:].rjust(4, "0"))
ip += b"\x80" # ttl
ip += b"\x01" # protocol
ip += b"\x00\x00" # header checksum
ip += bytes.fromhex(''.join([hex(int(i))[2:].rjust(2, '0') for i in "192.168.88.1".split(".")])) # source
ip += bytes.fromhex(''.join([hex(int(i))[2:].rjust(2, '0') for i in "192.168.88.132".split(".")])) # destination
ip += blocks[i] # such as icmp
ip = ip[:2] + bytes.fromhex(hex(len(ip))[2:].rjust(4, "0")) + ip[4:] # reset total length
ip = ip[:10] + checksum(ip[:20]) + ip[12:] # calc header checksum
ping = ether() + ip
send(ping)
a = icmp(b"test" * 789)
ipv4(a)
ICMP
Internet Control Message Protocol
https://datatracker.ietf.org/doc/html/rfc1071
虽然ICMP是包含在IP数据包中的,但是对ICMP消息通常会特殊处理,会和一般IP数据包的处理不同,而不是作为IP的一个子协议来处理
Package = Ethernet II + IP + ICMP
Struct | Size(Bytes) | Usage | Example | 可选 |
---|---|---|---|---|
Type | 1 | 类型 | 0x08 | |
Code | 1 | 代码 | 0x00 | |
Checksum | 2 | 校验和 | 0xe9c4 | |
Identifier | 2 | 进程pid(随机) | 0x0002 | |
Squence Number | 2 | 序列(递增) | 0x6395 | |
TimeStamp | 8 | 时间戳(小端序) | 0x1a5bad6400000000 | √ |
Data | 32 | 数据 | …. |
- Checksum:校验和置零,每 2 Bytes 做二进制加法,取反
a = "08000000000263956162636465666768696a6b6c6d6e6f7071727374757677616263646566676869"
c = 0
for i in range(len(a) // 4):
c += int(a[i * 4: (i + 1) * 4], 16)
c = (c & 0xffff) + 1 if c > 0xffff else c
icmp = a[:4] + hex(~c & 0xffff)[2:] + a[8:]
print(icmp)
- 手动构造
# Windows ping.exe
package = b""
package += b"\x08"
package += b"\x00"
package += b"\x00\x00"
package += b"\x12\x34"
package += b"\x56\x78"
package += b"test".rjust(32, b"\x00")[:32]
def checksum(data: bytes) -> bytes:
data = data.hex()
c = 0
for i in range(len(data) // 4):
c += int(data[i * 4: (i + 1) * 4], 16)
c = (c & 0xffff) + 1 if c > 0xffff else c
c = hex(~c & 0xffff)[2:]
c = c.rjust((len(c) // 2 + 1) * 2, "0") if len(c) % 2 else c
return bytes.fromhex(c)
print(checksum(package))
# Linux ping
# 在 checksum 后增添小端序的8字节时间戳
from time import time
from test import send
from struct import Struct
from ctypes import create_string_buffer
from numpy import frombuffer, uint8
def little_endian(num: int):
buffer = create_string_buffer(Struct('<1i').size)
Struct('<1i').pack_into(buffer, 0, num)
data = frombuffer(buffer, dtype=uint8)
return bytes.fromhex(''.join(list(map(lambda x: hex(x)[2:], data))).ljust(16, "0"))
def checksum(data: bytes) -> bytes:
data = data.hex()
c = 0
for i in range(len(data) // 4):
c += int(data[i * 4: (i + 1) * 4], 16)
c = (c & 0xffff) + 1 if c > 0xffff else c
c = hex(~c & 0xffff)[2:]
c = c.rjust((len(c) // 2 + 1) * 2, "0") if len(c) % 2 else c
return bytes.fromhex(c)
package = b""
package += b"\x08" # type
package += b"\x00" # code
package += b"\x00\x00" # checksum
package += b"\x12\x34" # identifier
package += b"\x56\x78" # sequence
package += little_endian(round(time())) # timestamp
package += b"test".rjust(48, b"\x00")[:48] # data
print(checksum(package))
ARP
Address Resolution Protocol
https://zh.wikipedia.org/wiki/%E5%9C%B0%E5%9D%80%E8%A7%A3%E6%9E%90%E5%8D%8F%E8%AE%AE
Package = Ethernet II + ARP
目标以太网地址:目标MAC地址。FF:FF:FF:FF:FF:FF (二进制全1)为广播地址
未知MAC地址:00:00:00:00:00:00
Struct | Size(bytes) | Description | Example |
---|---|---|---|
Hardware Type | 2 | 硬件类型 | 0x0001 |
Protocol Type | 2 | 协议类型 | 0x0800 |
Hardware Size | 1 | 硬件地址长度(Bytes) | 0x06 |
Protocol Size | 1 | 协议地址长度(Bytes) | 0x04 |
Opcode | 2 | 操作码 | 0x0001 |
Sender Mac Address | 6 | 源MAC | … |
Sender IP Address | 4 | 源IP | … |
Target Mac Address | 6 | 目标MAC | … |
Target IP Address | 4 | 目标IP | … |
Opcode | Description |
---|---|
0x0001 | ARP Request |
0x0002 | ARP Replay |
0x0003 | RARP Request |
0x0004 | RARP Replay |
- 手动构造
arp = b""
arp += b"\x00\x01" # hardware type
arp += b"\x08\x00" # protocol type
arp += b"\x06" # hardware size
arp += b"\x04" # protocol size
arp += b"\x00\x01" # opcode
arp += bytes.fromhex("00:0c:29:e5:7f:05".replace(":", "")) # sender mac
arp += bytes([int(i) for i in "192.168.88.1".split(".")]) # sender ip
arp += b"\x00\x00\x00\x00\x00\x00" # target mac
arp += bytes([int(i) for i in "192.168.88.132".split(".")]) # target ip
print(arp)
Test Script
- send.py
from ctypes import create_string_buffer, c_ubyte, POINTER, byref
import libpcap as pcap
class NotExistAdapterError(Exception):
pass
def finddevs_by_name(name: str):
alldevs = POINTER(pcap.pcap_if_t)()
pcap.findalldevs(byref(alldevs), create_string_buffer(256))
adapters = []
while alldevs[0].name:
adapters.append(alldevs[0].description)
if name.lower() in alldevs[0].description.decode().lower():
adapter = alldevs[0].name
pcap.freealldevs(alldevs)
return adapter
try:
alldevs[0] = alldevs[0].next[0]
except ValueError:
break
pcap.freealldevs(alldevs)
raise NotExistAdapterError(f"No such network adapter: `{name}` in {adapters}")
def send(package: bytes):
p = pcap.open_live(
create_string_buffer(finddevs_by_name("VMnet8")),
65535,
True,
100,
create_string_buffer(256)
)
package = (c_ubyte * len(package)).from_buffer(bytearray(package))
pcap.sendpacket(p, package, len(package))
- ping.py
from send import send
def checksum(data: bytes) -> bytes:
data = data.hex()
c = 0
for i in range(len(data) // 4):
c += int(data[i * 4: (i + 1) * 4], 16)
c = (c & 0xffff) + 1 if c > 0xffff else c
c = hex(~c & 0xffff)[2:]
c = c.rjust((len(c) // 2 + 1) * 2, "0") if len(c) % 2 else c
return bytes.fromhex(c)
ether = b""
ether += bytes.fromhex("000c29e57f05") # destination
ether += bytes.fromhex("005056c00008") # source
ether += b"\x08\x00" # type
icmp = b""
icmp += b"\x08" # type
icmp += b"\x00" # code
icmp += b"\x00\x00" # checksum
icmp += b"\x12\x34" # identifier
icmp += b"\x56\x78" # sequence number
icmp += b"test".rjust(32, b"\x00")[:32] # data
icmp = icmp[:2] + checksum(icmp) + icmp[4:]
ip = b""
ip += b"\x45" # version + header length
ip += b"\x00" # differentiated services
ip += b"\x00\x3c" # total length
ip += b"\xb4\x5e" # identification
ip += b"\x00\x00" # flags + fragment offset
ip += b"\x80" # ttl
ip += b"\x01" # protocol
ip += b"\x00\x00" # header checksum
ip += bytes.fromhex(''.join([hex(int(i))[2:].rjust(2, '0') for i in "192.168.88.1".split(".")])) # source
ip += bytes.fromhex(''.join([hex(int(i))[2:].rjust(2, '0') for i in "192.168.88.132".split(".")])) # destination
ip += icmp # such as icmp
ip = ip[:2] + bytes.fromhex(hex(len(ip))[2:].rjust(4, "0")) + ip[4:] # reset total length
ip = ip[:10] + checksum(ip[:20]) + ip[12:] # calc header checksum
ping = ether + ip
send(ping)
- arp.py
from send import send
ether = b""
ether += bytes.fromhex("ffffffffffff") # destination mac
ether += bytes.fromhex("005056c00008") # source mac
ether += b"\x08\x06" # type
arp = b""
arp += b"\x00\x01" # hardware type
arp += b"\x08\x00" # protocol type
arp += b"\x06" # hardware size
arp += b"\x04" # protocol size
arp += b"\x00\x01" # opcode
arp += bytes.fromhex("00:0c:29:e5:7f:05".replace(":", "")) # sender mac
arp += bytes([int(i) for i in "192.168.88.1".split(".")]) # sender ip
arp += b"\xff\xff\xff\xff\xff\xff" # target mac
arp += bytes([int(i) for i in "192.168.88.132".split(".")]) # target ip
arp = ether + arp
send(arp)