先简述一下需求背景:为了实现可观测性,产品中的一些指标数据需要推送到 Prometheus 聚合网关,出于不想对主体服务有更多干扰的原则,项目选择使用 UDP 协议发送数据(也对聚合网关做了简单的 UDP 数据接收改造)。但是在运行过程中,发现上报数据量偏少,经过日志排查,在数据发送端瞧出了点端倪:

error: [Errno 90] Message too long

问题非常简单明晰:单次请求发送的数据包大小超过了网络数据包的上限

那么就有了第一个问题:UDP 协议规定的包大小究竟是多少呢?

UDP 包是多大?

先来看一下 UDP 协议 的包长什么样子:

The UDP protocol header consists of 8 bytes of Protocol Control Information (PCI)
The UDP protocol header consists of 8 bytes of Protocol Control Information (PCI)

如图所见,UDP 可能算是最简洁的传输层协议之一了,包的组成非常简单易懂:

  • Source Port:包来源端口信息。
  • Destination Port:包目的端口信息。
  • UDP Length:UDP 头信息+ Payload 的总长度。
  • UDP Checksum:包的校验和,避免在数据在传输中被污染。

以上四个部分称之为协议控制信息 PCI,又称协议头,每个部分 2 bytes,一共占 8 bytes。

同时由于网络层的 IP 头也需要占据一部分空间,所以在计算 Payload 大小时需要先来看看 IP 层的情况。首先来看 IPv4:

# IPv4
0xffff - (sizeof(IP Header) + sizeof(UDP Header)) = 65535-(20+8) = 65507

总的来说符合预期,在算上 IP 头的情况下,IPv4 UDP 包 Payload 最大不能超过 65507 bytes。

但是在 IPv6 的情况有了一点点复杂:由于 Jumbogram(超大包) 的存在,超过 65535 bytes 的包是可以在调整过 MTU 的节点中传输的,所以理论上的最大体积为:

# IPv6
0xffff - sizeof(UDP Header) = 65535-8 = 65527

也就是在你的基建完全支持 IPv6 并且能够设置超大 MTU 时,IPv6 UDP 包 Payload 包最大不能超过 65527 bytes。

⚠️

注意:不同操作系统限值也可能会不同,例如 macOS 下最大包默认值为 9216 bytes。本文仅从协议角度讨论。

以上分析的都是理论最大值,而在真实传输中,如果想尽可能保证 UDP 包的安全,不得不考虑 MTU 在其中的影响:当数据包大小比 MTU 值大的越多,传输时被切分的段数就越多,由于 UDP 本身协议的不可靠性,数据包的安全性就越低。

而 MTU 的最小值为 576 bytes,在 IPv4 的情况下,减去上面提到的 IP Header 和 UDP Header ,还剩下 548 bytes 。这也是为什么很多公网服务都会限制 UDP 包的最大不能超过 512 bytes (例如 DNS)。同时在 IP Options 存在的情况,IP Header 可能会占据 60 bytes。所以,如果你的 UDP 包想要穿越复杂的公网,最安全的最大值是 508 bytes

考虑当前面对的网络环境——接发双方都处于同一个容器集群内,属于没有太多干扰的私有网络——可以酌情适当加大限制,只要小于 65507 bytes 即可

搞清楚了最大能发送多少数据,自然会想将超限的数据进行切片处理,那么迎来了第二个问题:如何在 Python 计算数据大小?

如何在 Python 计算数据大小?

我的第一反应是使用 getsizeof ,虽然稍有经验的 Python 程序员会想到,它对于容器类的对象是没法精准统计的,好在当前场景里只是用来统计 bytes,并没有这个困扰。

但简单测试了一下,却发现了新的疑惑。

getsizeof 的疑惑

from sys import getsizeof

getsizeof(b"")        👉 33
getsizeof(b"abcd")    👉 37

有意思的是,就算空 bytes,为什么也有 33 bytes 的空间占用?稍有经验的程序员就能立马反应过来,这就是对象本身的大小,一窥 Python 内置数据类型的大小(64-bit Python 3.6):

Empty
Bytes  type        scaling notes
28     int         +4 bytes about every 30 powers of 2
37     bytes       +1 byte per additional byte
49     str         +1-4 per additional character (depending on max width)
48     tuple       +8 per additional item
64     list        +8 for each additional
224    set         5th increases to 736; 21nd, 2272; 85th, 8416; 341, 32992
240    dict        6th increases to 368; 22nd, 1184; 43rd, 2280; 86th, 4704; 171st, 9320
136    func def    does not include default args and other attrs
1056   class def   no slots 
56     class inst  has a __dict__ attr, same scaling as dict above
888    class def   with slots
16     __slots__   seems to store in mutable tuple-like structure
                   first slot grows to 48, and so on.

虽然我们在代码中传入的是对象,但实际上传输的数据需要刨除语言附加的存储空间,所以不能通过 getsizeof 拿到的数据本身的大小,这算是一条走错的思路 xD。

简单点, len()

让我们回到问题本身,对于 bytes 数据,最简单统计大小的方式就是直接使用 len()

big_bytes: bytes
size = len(big_bytes)

确定了如何计算数据大小,下一步的思路就是:如何对 Prometheus 指标数据切片?

如何对 Prometheus 指标数据切片?

正如标题提到的, UDP 传输的是 Prometheus 的指标数据,它是由一个有字面值的字符串转换而来的二进制,它还是字符串的时候一般长这样:

# HELP sample_metric_foo_count 示例解释
# TYPE sample_metric_foo_count gauge
sample_metric_foo_count{foo="aa",bar="bb"} 14.0
sample_metric_foo_count{foo="cc",bar="dd"} 27.0
sample_metric_foo_count{foo="ee",bar="ff"} 12.0

# HELP sample_metric_bar_count 示例解释
# TYPE sample_metric_bar_count gauge
sample_metric_bar_count{baz="gg"} 660.0
sample_metric_bar_count{baz="hh"} 660.0

它本身需要符合一定的格式# 开头的两行作为指标信息的元信息,

# HELP sample_metric_foo_count 示例解释
# TYPE sample_metric_foo_count gauge

可以先看看 Prometheus 是如何解析它的

// 以下为摘抄内容
func (p *TextParser) startOfLine() stateFn {
  ...
	switch p.currentByte {
	case '#':
		return p.startComment
	case 'n':
		return p.startOfLine // Empty line, start the next one.
	}
	return p.readingMetricName
}

func (p *TextParser) startComment() stateFn {
  ...
	keyword := p.currentToken.String()
	if keyword != "HELP" && keyword != "TYPE" {
		// Generic comment, ignore by fast forwarding to end of line.
		for p.currentByte != 'n' {
			if p.currentByte, p.err = p.buf.ReadByte(); p.err != nil {
				return nil // Unexpected end of input.
			}
		}
		return p.startOfLine
	}
	...
	switch keyword {
	case "HELP":
		return p.readingHelp
	case "TYPE":
		return p.readingType
	}
	panic(fmt.Sprintf("code error: unexpected keyword %q", keyword))
}

可以看到:如果直接粗暴地切分会导致接收端无法解析从而丢弃整个请求数据。所以需要针对数据的开头和结尾特征值来决定具体的切分点。

------ # HELP 作为开头,尽量作为数据包的开头 ------
# HELP sample_metric_foo_count 示例解释
# TYPE sample_metric_foo_count gauge
sample_metric_foo_count{foo="aa",bar="bb"} 14.0
sample_metric_foo_count{foo="cc",bar="dd"} 27.0
sample_metric_foo_count{foo="ee",bar="ff"} 12.0

# HELP sample_metric_bar_count 示例解释
# TYPE sample_metric_bar_count gauge
sample_metric_bar_count{baz="gg"} 660.0n
------ n 作为单行数据结尾,能够保证包数据被正常解析,最差情况作为数据包的结尾 ------
sample_metric_bar_count{baz="hh"} 660.0  由于缺少 HELP/TYPE 元信息,将无法被聚合处理

开码!

既然准备好了思路,那么就开始正式编码吧。

首先定义模型

好的模型能够清楚地展示编码思路,为了更方便切分定义了如下模型:

@dataclass
class SlicedIndex:
    """数据切分索引"""
    start: int
    end: int
    # 如果不是以 # HELP 开头,会导致 metric 无法被识别合并
    valid_start: bool = True
    # 如果不是以 metric 内容完结,会让下一个分片包没有有效开头
    valid_end: bool = True

    def to_tuple(self) -> tuple:
        return self.start, self.end

    def __len__(self):
        return self.end - self.start


@dataclass
class SlicedIndexList:
    indexes: List[SlicedIndex] = field(default_factory=list)

    def append(self, start: int, end: int, valid_end: bool = True):
        valid_start = True
        if self.indexes and not self.indexes[-1].valid_end:
            valid_start = False

        self.indexes.append(SlicedIndex(start, end, valid_start, valid_end))

    def __iter__(self):
        for elem in self.indexes:
            yield elem

    def __getitem__(self, ii):
        """Get a list item"""
        return self.indexes[ii]

由于网络环境较为良好,这里更倾向于单次尽可能提交足够多的数据而减少网络发送次数。所以尽量贴近最大限制做数据切分,由于每次查找逻辑类同,采用递归会更简单。

递归寻找切分点

def find_sliced_indexes(data: bytes, start: int, udp_package_max_size: int, sliced_index_list: SlicedIndexList):
    """递归找寻指标开头标志"""
    if start + udp_package_max_size >= len(data):
        sliced_index_list.append(start, len(data))
        return

    valid_start_index = data.rfind(b"# HELP", start, start + udp_package_max_size)
    if valid_start_index == -1 or valid_start_index == start:
        # 当某个 metrics 数据大小大于 UDP 协议最大包限制时,原则上我们无法通过 UDP 发送该数据
        # 但这里我们尝试以最大限制发送该数据,以一个超过最大限制的数据为例
        # +-----------------+ 协议最大长度
        # +-----------------+  +-------------------+  +-------------------+
        # 有效            有效  (缺失元信息部分丢弃) 有效  有效              有效
        # 可以看到中间段数据会被丢弃,因为没有有效的 metrics 开头,但是我们尽可能保证了后续的包是有效开头和有效结尾
        # 所以当前的做法更利于我们单 metrics 数据比 65535 稍大的场景,这样丢弃的内容少,保留的内容多
        # 反之,当单个 metrics 的数据比最大限制大的越多,丢弃的越多

        # sample 肯定会转行,只要有转行就能保证起码前一个包的内容格式有效
        valid_end_index = data.rfind(b"n", start, start + udp_package_max_size)
        new_start = valid_end_index + 1
        sliced_index_list.append(start, new_start, valid_end=False)
    else:
        sliced_index_list.append(start, valid_start_index)
        new_start = valid_start_index

    # 尝试继续向后查找
    find_sliced_indexes(data, new_start, udp_package_max_size, sliced_index_list)
    return

切!

当拿到了想要切入点后,就可以对原数据进行切分了。这里可以使用 memoryview,节省一次大数据拷贝,切片的速度也会更快一点:

def slice_metrics_udp_data(data: bytes, sliced_indexes: SlicedIndexList) -> Generator[memoryview, None, None]:
    """拆分 metrics UDP data"""
    # Q: 为什么不直接切分?
    # A: 直接按照大小切分会降 bytes 中的字面值切断,让服务端对于指标无法理解,所以要按照字面值的内容做切分

    # memoryview 无需额外拷贝
    mview = memoryview(data)
    for index in sliced_indexes:
        if not index.valid_start:
            logger.warning("data<len:%s> has no valid start, may not be parsed", len(index))
        yield mview[index.start : index.end]

收尾

针对一些特殊场景做最后的善后处理

def find_udp_data_sliced_indexes(data: bytes, udp_package_max_size: int = 65507, mtu: int = 1500) -> SlicedIndexList:
    """对 UDP 发送数据进行切片处理,保证每次发送成功
    :param data: 预发送数据
    :param udp_package_max_size: 当前系统支持的最大 UDP 发送包大小,以 bytes 计算,默认为 65535 (在 macOS 下默认为 9126)
    :param mtu: Maximum Transmission Unit

    udp_package_max_size = 0xffff - (sizeof(IP Header) + sizeof(UDP Header)) = 65535-(20+8) = 65507
    ref to: https://en.wikipedia.org/wiki/User_Datagram_Protocol
    """
    length = len(data)
    if length > mtu:
        # TODO: 当前我们暂不考虑处理 MTU 的问题,先解决 UDP 包过大的情况
        logger.debug("UDP packages is larger than MTU, not safe for single push.")

    if length <= udp_package_max_size:
        return SlicedIndexList(indexes=[SlicedIndex(0, length)])

    sliced_index_list = SlicedIndexList()
    try:
        find_sliced_indexes(data, 0, udp_package_max_size, sliced_index_list)
    except RecursionError:
        logger.warning("data has no valid format, drop it...")

    return sliced_index_list

效果

# 原来的 UDP 直接发送
# 将抛出 error: [Errno 90] Message too long
udp_socket.sendto(data, (address, port))

# 循环调用 UDP 请求
for sliced_data in slice_metrics_udp_data(data, find_udp_data_sliced_indexes(data)):
	  # 所有 UDP 包都可以发送出去
    # 同时取决于数据特性,绝大多数的 UDP 数据都能在服务端得到解析
    udp_socket.sendto(sliced_data, (address, port))

尾声

进一步优化?

正如我们上面实现的逻辑,仍旧无法保证所有数据都能被解析成功,主要的原因就是我们没有对缺失元信息的指标进行处理,而我们在解析其中内容的时候是可以通过 bytes.index() 找到元信息位置的,可以将元信息暂存起来,在 valid_start 时直接补充上去,这样就能保证所有数据都有合适的格式。

Prometheus UDP 数据切分是一个在开发中普通的不能再普通的小问题。在这里写这么多并不是为了展现该问题本身的解决方案,而是用来描述一种完备的开发思路。