一、背景描述
1.为什么会想到做这个场景?这个场景有什么意义?
我们的项目主要是作为集团的基础服务,提供业务可观测支撑的。在大概一年前,我们项目接到了上游同学的需求,想要查看业务异常时候的日志。
首先说一下业务场景,由于我们是基础服务,从资源供给的角度来讲,我们没有办法,全量采集所有上层业务的日志。原因:
上游业务不可控。
业务量庞大。
数据日志庞大。
基础存储无法支撑。
2.我们当时是如何考虑和设计的?
针对这个场景,我们想到了对 OpenTelemetry 仓库的,Filelog receiver 进行二次开发。
原始的Filelog receiver,只支持从文件最开始,或者文件最末尾采集后续的所有日志,启动后无法停止,会对我们的存储造成很大冲击,而且业务侧不可控。
因此,我们提出了 ondemand 方式,即按需采集。当上游同学通过我们的平台发起日志查询时,平台会发起采集请求,经过中间链路,通知我们的 Filelog receiver 进行一定时间的日志采集,比如 10min,而后进行停止。
但是,我们虽然做了二次开发,保障了我们存储服务的压力,但还是有很显著的问题:
我们的采集,是从用户的触发时间点开始采集的。
而用户想要查询日志的需求,一定是业务发生了一些错误,才会去有查询日志的行为。
没错,我们的日志采集是滞后的,对于用户来讲,采集的大概率是无效的日志。
也因此,我们一直将这个需求给搁置了。
二、切入点的突破
在技术上,我个人是比较喜欢专研的。针对之前的这个问题,一直放在心上,有啥新思路或者想法,都会想着尝试去做一做。
由于现在工作的特殊性,每天下班回家,都有时间保持刷算法,刷到现在,慢慢的感受到,算法本身就是各类问题解决方法的抽象的集合,所以平时也会想着把学到的算法应用到业务的优化中。
针对上述的业务场景,我预设了条件:如果能够知道当前日志文件的日志行数,如果能够很快的查询到第n条日志行,那么可以运用二分查找,很容易的就可以查到大于等于某个时间戳的日志行。问题就直接被拆解了。
但很遗憾,针对文件日志条目查询和查询第n行日志,没有找到很有效的方式。因为文件的存储,本身是一些字节流数据,它没有行的概念,操作系统对文件的存储,也没有这样的标志,文件行的识别,本质上是通过特殊字符 '\n' 或者 '\r\n' 等标记的。所以,我无法做到通过调用某些API,实现O(1)的复杂度,直接获取结果。
于是,我尝试使用 Golang 原生的 api,进行上述数据的获取测试。得到了进一步的突破:
通过 bufio.Scanner(),扫描 1.5GB 的数据,千万行代码,只需要 数百毫秒 就可以做到。
三、数据测试
测试数据准备:千万行级别日志,大小1GB+。
du -sh pkg/file-process/log1
1.5G pkg/file-process/log1
cat pkg/file-process/log1|wc -l
11601560
日志时间格式:符合klog日志标准。如:
E0424 23:27:27.690467 xxxxxxxxxxxxxxxxxxxxx
1. 文件行统计
通过 bufio.Scanner() 实现 O(n) 复杂度的文件扫描。
func countFile(filepath string) (int, error) {
file, err := os.Open(filepath)
if err != nil {
return 0, err
}
defer file.Close()
// 使用scanner,逐行扫描
sc := bufio.NewScanner(file)
lineCount := 0
for sc.Scan() {
lineCount++
}
return lineCount, nil
}
验证:
func main() {
filepath := `./pkg/file-process/log1`
start := time.Now()
count, err := countFile(filepath)
fmt.Println(count, err, time.Since(start))
}
go run pkg/file-process/main.go
11601561 <nil> 478.404125ms
2. 转化日志行的时间戳
日志参考标准:
E0424 23:27:27.690467 xxxxxxxxxxxxxxxxxxxxx
func parseTime(msg string) (time.Time, error) {
// 取前21个字符,舍弃第一个字符
timeStr := msg[:21][1:]
// 0424 23:27:27.690467
// 0424
datePart := timeStr[:4]
// 23:27:27.690467
timePart := timeStr[5:]
now := time.Now()
currentYear := now.Year()
currentMonth := now.Month()
// 提取月份和日期
month, _ := strconv.Atoi(datePart[:2])
day, _ := strconv.Atoi(datePart[2:])
// 决定年份
var year int
if month > int(currentMonth) {
year = currentYear - 1
} else {
year = currentYear
}
// 加载东八区时区
location, err := time.LoadLocation("Asia/Shanghai")
if err != nil {
return time.Time{}, err
}
// 创建时间
dateStr := fmt.Sprintf("%04d-%02d-%02d %s", year, month, day, timePart)
parsedTime, err := time.ParseInLocation("2006-01-02 15:04:05.000000", dateStr, location)
if err != nil {
return time.Time{}, err
}
return parsedTime, nil
}
测试:
func main() {
fmt.Println(parseTime("E0424 23:27:27.690467 xxxxxxxxxxxxxxxxxxxxx"))
}
go run pkg/file-process/main.go
2025-04-24 23:27:27.690467 +0800 CST <nil>
3. 获取文件的第n行日志
// 从0开始
func getNthLog(n int, filepath string) string {
file, err := os.Open(filepath)
if err != nil {
return ""
}
defer file.Close()
sc := bufio.NewScanner(file)
lineCnt := -1
for sc.Scan() {
lineCnt++
if lineCnt == n {
return sc.Text()
}
}
return ""
}
测试:
func main() {
filepath := "./pkg/file-process/log1"
fmt.Println(getNthLog(0, filepath))
}
go run pkg/file-process/main.go
I0424 22:21:38.901831 xxxxxxxxxxxxxxxxxxxxxxxx
4. 使用二分查找获取大于等于目标时间戳的最小行
通过 Golang 原生 api 实现二分查找下界查询,需要注意:行数从0开始统计,如果返回行数 n == totalCnt, 表示没查到目标行。
func getObjLineAfterTimestamp(totalCnt int, timestamp time.Time, filepath string) int {
return sort.Search(totalCnt, func(i int) bool {
// 获取第i行日志
msg := getNthLog(i, filepath)
curTime, err := parseTime(msg)
if err != nil {
log.Printf("parse time err: %s", err.Error())
return false
}
return curTime.Equal(timestamp) || curTime.After(timestamp)
})
}
验证:
func main() {
filepath := "./pkg/file-process/log1"
// 2. 通过二分查找, 获取 >= t 时间的最小代码行
location, _ := time.LoadLocation("Asia/Shanghai")
t, _ := time.ParseInLocation("2006-01-02 15:04:05.000000", "2025-04-24 22:21:38.998831", location)
fmt.Println(t)
objLine := getObjLineAfterTimestamp(11601561, t, filepath)
fmt.Println(objLine)
}
go run pkg/file-process/main.go
2025-04-24 22:21:38.998831 +0800 CST
309
5. 获取二分查找得到的目标行之后的n行日志
func getNLogFromStartline(start, n int, filepath string) []string {
file, err := os.Open(filepath)
if err != nil {
return nil
}
defer file.Close()
sc := bufio.NewScanner(file)
obj := -1
for sc.Scan() {
obj++
if obj == start {
break
}
}
ans := make([]string, 0, n)
ans = append(ans, sc.Text())
for sc.Scan() {
if len(ans) == n {
break
}
ans = append(ans, sc.Text())
}
return ans
}
验证:
func main() {
filepath := "./pkg/file-process/log1"
// 2. 通过二分查找, 获取 >= t 时间的最小代码行
logs := getNLogFromStartline(309, 1000, filepath)
fmt.Println(len(logs))
}
go run pkg/file-process/main.go
1000
6. 多组用例,整体时延测试
func GetNLogAfterTime(filepath string, t time.Time, n int) ([]string, error) {
// 1. 获取文件行数
lineCnt, err := countFile(filepath)
if err != nil {
return nil, err
}
// 2. 通过二分查找, 获取 >= t 时间的最小代码行
objLine := getObjLineAfterTimestamp(lineCnt, t, filepath)
// 没有符合条件的日志
if objLine == lineCnt {
return nil, nil
}
// 3. 从objLine开始,读取n行日志返回, 不够n行则全部返回
return getNLogFromStartline(objLine, n, filepath), nil
}
测试:
func main() {
filepath := "./pkg/file-process/log1"
t1, _ := parseTime("I0424 22:42:38.314337")
now := time.Now()
logs, err := GetNLogAfterTime(filepath, t1, 1000000)
duration := time.Since(now)
fmt.Println(len(logs), err, duration)
}
go run pkg/file-process/main.go
1000000 <nil> 1.176719625s
按上述实现,在千万行数据里面,找到某个目标时间戳之后的百万行数据,只需要1.x s。实现了十分可观的性能!
补充:
读者可以使用如下代码,构造日志。
package main
import (
"flag"
"k8s.io/klog/v2"
)
func main() {
klog.InitFlags(nil)
defer klog.Flush() // 确保程序结束时将缓存日志刷入文件
logFile := "./log"
flag.Set("logtostderr", "false")
flag.Set("log_file", logFile)
flag.Parse()
for i := 0; i < 1e7; i++ {
klog.Info("xxx")
}
}
klog打印时,会打印一些额外的信息,由于代码里面没有特殊判断,可能会误识别,建议删除文件开头信息,
Log file created at: 2025/04/24 17:36:11
Running on machine: U-L2M7XGNF-2325
Binary: Built with gc go1.18 for darwin/arm64
Log line format: [IWEF]mmdd hh:mm:ss.uuuuuu threadid file:line] msg
以及文件末尾一些空行信息。
四、补充说明
上述用例二分实现使用的是Golang sort包下面的二分查找api,对于查询有序数据下界来说极其便利。
而我们在获取第n行数据时,由于每次都需要从0开始扫描,如果数据不存在的话,我们的扫描次数会很多,导致时延升高到十几秒。
针对这一点可以做如下优化:scanner 复用。对于以扫描的n行数据,做记忆化存储:进行复用。
对没有扫描的数据,复用scanner,继续后置位扫描,能够很大程度上提升处理性能!
评论