万普插件库

jQuery插件大全与特效教程

基于Prometheus的自动化巡检



!! 大家好,我是乔克,一个爱折腾的运维工程,一个睡觉都被自己丑醒的云原生爱好者。


作者:乔克
公众号:运维开发故事



道路千万条,安全第一条。操作不规范,运维两行泪。

前言

目前,大部分公司都采用Prometheus + Grafana这一套来做指标监控,所以在Prometheus中也有大量的指标数据。为了满足日常工作中的巡检,可以基于Prometheus实现自动巡检,减轻部分运维压力。

思路

为了灵活管理巡检任务,将整个巡检功能进行了拆分管理,分为:

  • 数据源管理:可以管理多个Prometheus数据源,后期也可以增加其他数据源,比如ES等。
  • 巡检项管理:目前的巡检项就是各种Prometheus规则,之所以要单独进行管理,是为了在多数据源、多集群等情况下进行复用。
  • 标签管理:目前是Prometheuslabel,也是为了方便复用巡检项巡检项标签可以灵活进行组合。
  • 任务编排:编排各种巡检任务。
  • 执行作业:配置定时的巡检作业,它由多个编排的任务组成。
  • 巡检报告:便于查看、导出巡检结果。
  • 巡检通知:巡检结果可以通知到企业微信群,便于业务方快速知道目前整个系统有没有问题。

效果

数据源管理

(1)添加数据源

(2)数据源列表

巡检项管理

(1)添加巡检项

(2)巡检项列表

标签管理

(1)添加标签

(2)标签列表

任务编排

(1)创建任务编排

(2)任务列表

执行作业

(1)创建执行作业

(2)作业列表

巡检报告

每次巡检完成都会生成对应的巡检报告。

点击详情可以看到巡检的具体结果。

点击导出,即可将报告导出为PDF。

如果配置了巡检通知,则会将对应的巡检结果发送到企业微信群。

代码实现

大部分的代码都是普通的CRUD,比如数据源的管理、巡检项的管理都是基础的CRUD,没有什么好说的。

这里简单说一下具体巡检的实现。

(1)当用户创建了执行作业且该作业处于开启状态,就会创建一个定时任务。

// CreateCronTask 创建定时任务

func (inspectionExecutionJobService *InspectionExecutionJobService) CreateCronTask(job *AutoInspection.InspectionExecutionJob) error {
cronName := fmt.Sprintf("InspectionExecution_%d", job.ID)
taskName := fmt.Sprintf("InspectionExecution_%d", job.ID)
// 检查是否已存在相同的定时任务
if _, found := global.GVA_Timer.FindTask(cronName, taskName); found {
// 如果已存在,先清除旧的定时任务
global.GVA_Timer.Clear(cronName)
}
// 创建定时任务
var option []cron.Option
option = append(option, cron.WithSeconds())
// 添加定时任务
if _, err := global.GVA_Timer.AddTaskByFunc(cronName, job.CronExpr, func() {
// 执行巡检任务
inspectionExecutionJobService.ExecuteInspectionJob(job)
}, taskName, option...); err != nil {
global.GVA_LOG.Error("创建定时任务失败", zap.Error(err), zap.Uint("jobID", job.ID))
return err
}
// 更新下次执行时间
nextTime := inspectionExecutionJobService.calculateNextRunTime(job.CronExpr)
job.NextRunTime = &nextTime
// 更新数据库中的记录
return global.GVA_DB.Model(job).Updates(map[string]interface{}{
"next_run_time": job.NextRunTime,
}).Error
}

Tips:因为是采用的gin-vue-admin框架,所以直接使用框架自带的timer定时器。

(2)当执行时间到了,就会执行ExecuteInspectionJob巡检任务。

func (inspectionExecutionJobService *InspectionExecutionJobService) ExecuteInspectionJob(job *AutoInspection.InspectionExecutionJob) {
// 更新作业执行时间
inspectionExecutionJobService.updateJobExecutionTime(job)
// 创建执行记录
jobExecution := inspectionExecutionJobService.createJobExecution(job)
if jobExecution == nil {
return
}
// 执行所有关联的巡检任务并收集结果
allResults := inspectionExecutionJobService.executeAllInspectionTasks(job, jobExecution)
global.GVA_LOG.Info("执行完成", zap.Any("results", allResults))
// 更新执行记录状态和结果
inspectionExecutionJobService.updateJobExecutionResult(jobExecution, allResults)
// 发送通知
if *job.IsNotice {
inspectionExecutionJobService.sendInspectionNotification(job, jobExecution, allResults)
}
}

这里主要是executeAllInspectionTasks来执行巡检任务。

// executeAllInspectionTasks 执行所有关联的巡检任务并收集结果

func (inspectionExecutionJobService *InspectionExecutionJobService) executeAllInspectionTasks(job *AutoInspection.InspectionExecutionJob, jobExecution *AutoInspection.JobExecution) []*result.ProductsResult {
// 创建一个等待组来同步所有巡检任务
var wg sync.WaitGroup
// 创建一个互斥锁来保护结果集
var mu sync.Mutex
// 创建一个结果集合
allResults := make([]*result.ProductsResult, 0)
// 执行所有关联的巡检任务
for _, jobID := range job.JobIds {
wg.Add(1)
gofunc(id uint) {
defer wg.Done()
// 执行单个巡检任务并获取结果
result := inspectionExecutionJobService.executeSingleInspectionTask(id, jobExecution)
if result != nil {
// 将结果添加到总结果集中
mu.Lock()
allResults = append(allResults, result)
mu.Unlock()
}
}(jobID)
}
// 等待所有巡检任务完成
wg.Wait()
return allResults
}

它会把作业中的任务拆成单个任务,然后由executeSingleInspectionTask分别执行并收集执行结果。

// executeSingleInspectionTask 执行单个巡检任务
func (inspectionExecutionJobService *InspectionExecutionJobService) executeSingleInspectionTask(jobID uint, jobExecution *AutoInspection.JobExecution) *result.ProductsResult {
global.GVA_LOG.Info("执行巡检任务", zap.Uint("jobID", jobID))
// 获取巡检任务信息
inspectionJob, _ := inspectionJobService.GetInspectionJob(fmt.Sprintf("%d", jobID))
// 创建结果通道
resultCh := make(chan *result.ProductsResult)
// 创建一个用于等待结果的WaitGroup
var resultWg sync.WaitGroup
resultWg.Add(1)
// 用于存储结果的变量
var taskResult *result.ProductsResult
// 启动一个goroutine来接收结果
gofunc() {
defer resultWg.Done()
result := <-resultCh
global.GVA_LOG.Info("巡检任务执行完成",
zap.String("jobName", inspectionJob.Name),
zap.Any("result", result))
// 保存结果
taskResult = result
}()
// 执行巡检任务
inspectionExecutionJobService.ExecuteInspectionTask(&inspectionJob, jobExecution, resultCh)
// 等待结果接收完成
resultWg.Wait()
return taskResult

}

ExecuteInspectionTask中是为了方便扩展数据源。

func (inspectionExecutionJobService *InspectionExecutionJobService) ExecuteInspectionTask(inspectionJob *AutoInspection.InspectionJob, jobExecution *AutoInspection.JobExecution, resultCh chan *result.ProductsResult) {

switch inspectionJob.DataSourceType {
case "prometheus":
// 执行Prometheus巡检任务
inspectionExecutionJobService.ExecutePrometheusInspectionTask(inspectionJob, jobExecution, resultCh)
}
}

由于目前只有Prometheus数据源,所以将直接执行ExecutePrometheusInspectionTask。在这个方法中主要是构造Prometheus规则然后进行巡检。

// ExecutePrometheusInspectionTask 执行Prometheus巡检任务
func (inspectionExecutionJobService *InspectionExecutionJobService) ExecutePrometheusInspectionTask(inspectionJob *AutoInspection.InspectionJob, jobExecution *AutoInspection.JobExecution, resultCh chan *result.ProductsResult) {
// 执行Prometheus巡检任务的逻辑
var inspectionItemsService InspectionItemsService
var inspectionTagService InspectionTagService
var dataSourceService DataSourceService

// 获取数据源信息
dataSource, _ := dataSourceService.GetDataSource(fmt.Sprintf("%d", inspectionJob.DataSourceId))

// 创建规则列表
prometheusRules := make([]*product.PrometheusRule, 0, len(inspectionJob.ItemLabelMaps))

// 遍历巡检项与标签映射关系
for _, itemLabelMap := range inspectionJob.ItemLabelMaps {
// 获取巡检项信息
inspectionItem, _ := inspectionItemsService.GetInspectionItems(fmt.Sprintf("%d", itemLabelMap.ItemId))

// 获取标签信息
var inspectionTag AutoInspection.InspectionTag
if itemLabelMap.LabelId != 0 {
inspectionTag, _ = inspectionTagService.GetInspectionTag(fmt.Sprintf("%d", itemLabelMap.LabelId))
}

// 创建Prometheus规则
prometheusRule := &product.PrometheusRule{
Name: inspectionItem.Name,
Rule: inspectionItem.Rule,
LabelFilter: inspectionTag.Label,
Desc: inspectionItem.Description,
AlertInfo: inspectionItem.OutputTemplate,
DataSourceName: dataSource.Name,
}

// 添加到规则列表
prometheusRules = append(prometheusRules, prometheusRule)

}

// 创建规则集合
rules := product.Rules{
Prometheus: prometheusRules,
AliyunSafe: []*product.AliyunSafeRule{}, // 空列表,因为这里只处理Prometheus规则
}

// 创建产品
prod := &product.Product{
Name: inspectionJob.Name,
Rules: rules,
}

// 使用defer和recover捕获可能的panic
deferfunc() {
if r := recover(); r != nil {
// 记录panic信息
global.GVA_LOG.Error("执行巡检任务发生panic",
zap.Any("panic", r),
zap.String("jobName", inspectionJob.Name))

// 创建一个表示失败的结果并发送到结果通道
pr := &result.ProductsResult{ProductName: inspectionJob.Name}

// 为每个规则创建失败结果
for _, rule := range prometheusRules {
errorMsg := fmt.Sprintf("巡检执行失败: %v", r)
failureResult := result.NewRuleResult(
result.WithInspectionInfo(rule.Name),
result.WithInspectionResult(result.ABNORMAL),
result.WithInspectionErrorInfo(
[]map[string]string{{
"error": errorMsg,
"rule": rule.Rule,
}},
"执行规则 {{rule}} 时发生错误: {{error}}",
),
)
pr.Add(failureResult)
}

// 发送结果
resultCh <- pr
}
}()

// 执行巡检
err = prod.Run(resultCh)
if err != nil {
global.GVA_LOG.Error("执行巡检任务失败", zap.Error(err), zap.String("jobName", inspectionJob.Name))
return
}

global.GVA_LOG.Info("巡检任务已启动", zap.String("jobName", inspectionJob.Name))
}

prod.Run中,会去做真正的指标数据查询。

func (p *Product) Run(resultCh chan *result.ProductsResult) error {
global.GVA_LOG.Info(fmt.Sprintf("开始巡检, %s", p.Name))
pr := &result.ProductsResult{ProductName: p.Name}
// prometheus巡检规则
for _, prometheusRule := range p.Rules.Prometheus {
ruleInspectRes, err := prometheusRule.Run()
if err != nil {
return err
}
pr.Add(ruleInspectRes)
}
resultCh <- pr
returnnil
}

然后调用prometheusRule.Run获取结果。

func (r *PrometheusRule) Run() (*result.RuleResult, error) {
ds, err := datasource.GetByName(r.DataSourceName)
if err != nil {
returnnil, err
}
pds, ok := ds.(*datasource.PrometheusDataSource)
if !ok {
returnnil, fmt.Errorf("数据源类型错误: %s 不是Prometheus数据源", r.DataSourceName)
}
if pds.Client == nil {
returnnil, fmt.Errorf("数据源为空: %s", r.DataSourceName)
}
res, err := pds.Run(r.Rule, r.LabelFilter)
if err != nil {
returnnil, err
}
ruleRes := r.buildRuleResult(res)
return ruleRes, nil
}

func (r *PrometheusRule) buildRuleResult(resultLabels []map[string]string) *result.RuleResult {
iflen(resultLabels) == 0 {
return result.NewRuleResult(result.WithInspectionInfo(fmt.Sprintf("%s", r.Name)),
result.WithInspectionResult(result.NORMAL))
}
return result.NewRuleResult(result.WithInspectionInfo(fmt.Sprintf("%s", r.Name)),
result.WithInspectionResult(result.ABNORMAL),
result.WithInspectionErrorInfo(resultLabels, r.AlertInfo))
}

具体的查询是封装在pds.Run中的,它会去调用Prometheus的接口去查询数据。

func Query(client api.Client, rule string) (model.Value, []string, error) {
// 添加空指针检查
if client == nil {
returnnil, nil, errors.New("Prometheus client is nil")
}
v1Api := promV1.NewAPI(client)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
value, warnings, err := v1Api.Query(ctx, rule, time.Now(), promV1.WithTimeout(10*time.Second))
global.GVA_LOG.Debug("查询结果", zap.String("value", value.String()), zap.Any("warnings", warnings))
if err != nil {
returnnil, nil, errors.WithStack(err)
}
return value, warnings, nil
}

(3)如果需要发送到企业微信,就会构建发送结果进行发送。

func (inspectionExecutionJobService *InspectionExecutionJobService) sendInspectionNotification(job *AutoInspection.InspectionExecutionJob, jobExecution *AutoInspection.JobExecution, results []*result.ProductsResult) {
// 获取通知配置
var notifyService = NotifyService{}
notify, err := notifyService.GetNotify(fmt.Sprintf("%d", job.NoticdId))
if err != nil {
global.GVA_LOG.Error("获取通知配置失败", zap.Error(err))
return
}

// 构建通知内容
// 1. 巡检摘要
taskCount := len(results) // 巡检任务数量
itemCount := 0 // 巡检项数量
normalCount := 0 // 正常项数量
abnormalCount := 0 // 异常项数量
abnormalItems := []string{} // 异常项列表

// 统计巡检项、正常项和异常项的数量
for _, task := range results {
itemCount += len(task.SubRuleResults)
for _, item := range task.SubRuleResults {
if item.InspectionResult == result.NORMAL {
normalCount++
} elseif item.InspectionResult == result.ABNORMAL {
abnormalCount++
// 收集异常项信息
abnormalDetail := fmt.Sprintf("【%s】%s", task.ProductName, item.InspectionInfo)
iflen(item.InspectionErrorInfo) > 0 {
abnormalDetail += "\n" + strings.Join(item.InspectionErrorInfo, "\n")
}
abnormalItems = append(abnormalItems, abnormalDetail)
}
}
}

// 格式化摘要信息
summary := fmt.Sprintf("巡检任务%d个,巡检项%d个,正常%d个,异常%d个", taskCount, itemCount, normalCount, abnormalCount)

// 构建企业微信通知内容
var content string
if notify.TemplateType == "markdown" {
// Markdown格式
content = fmt.Sprintf(`{
"msgtype": "markdown",
"markdown": {
"content": "# 自动化巡检结果通知\n\n> ### 执行作业:%s\n> ### 执行时间:%s\n> ### 执行结果:%s\n\n### **异常项列表:**\n%s"
}
}`
,
jobExecution.ExecutionJobName,
jobExecution.EndTime.Format("2006-01-02 15:04:05"),
summary,
formatAbnormalItems(abnormalItems))
} else {
// 文本格式
content = fmt.Sprintf(`{
"msgtype": "text",
"text": {
"content": "巡检结果通知\n执行作业:%s\n执行时间:%s\n执行结果:%s\n\n异常项列表:\n%s"
}
}`
,
jobExecution.ExecutionJobName,
jobExecution.EndTime.Format("2006-01-02 15:04:05"),
summary,
formatAbnormalItemsText(abnormalItems))
}

// 发送通知
ctx := context.Background()
sendParams := sender.SendParams{
NoticeType: notify.Type,
NoticeId: fmt.Sprintf("%d", notify.ID),
NoticeName: notify.Name,
Hook: notify.Address,
Content: content,
}

err = sender.Sender(&ctx, sendParams)
if err != nil {
global.GVA_LOG.Error("发送巡检通知失败", zap.Error(err))
return
}

global.GVA_LOG.Info("发送巡检通知成功",
zap.String("jobName", jobExecution.ExecutionJobName),
zap.String("summary", summary))
}

(4)PDF导出是用wkhtmltopdf实现,该包依赖服务器上的wkhtmltopdf命令。

func (jobExecutionService *JobExecutionService) GeneratePDF(jobExecution *AutoInspection.JobExecution) (string, error) {

pdf, err := wkhtmltopdf.NewPDFGenerator()
if err != nil {
global.GVA_LOG.Error("PDF生成器初始化失败", zap.Error(err))
return"", err
}

// 设置全局选项
pdf.Dpi.Set(300)
pdf.Orientation.Set(wkhtmltopdf.OrientationPortrait)
pdf.PageSize.Set(wkhtmltopdf.PageSizeA4)
pdf.MarginTop.Set(20)
pdf.MarginBottom.Set(20)
pdf.MarginLeft.Set(20)
pdf.MarginRight.Set(20)

// 渲染HTML模板
htmlContent, err := jobExecutionService.renderHTMLTemplate(jobExecution)
if err != nil {
global.GVA_LOG.Error("HTML模板渲染失败", zap.Error(err))
return"", err
}

// 创建一个页面并添加到生成器
page := wkhtmltopdf.NewPageReader(bytes.NewBufferString(htmlContent))
pdf.AddPage(page)

// 生成PDF
err = pdf.Create()
if err != nil {
return"", err
}

basePath := "uploads/pdf"
// 创建目录(如果不存在)
if err = os.MkdirAll(basePath, 0755); err != nil {
global.GVA_LOG.Error("创建PDF保存目录失败", zap.Error(err))
return"", err
}

filename := generatePDFFileName(jobExecution)
filePath := filepath.Join(basePath, filename)

// 3. 保存PDF到文件
if err = os.WriteFile(filePath, pdf.Bytes(), 0644); err != nil {
global.GVA_LOG.Error("保存PDF文件失败", zap.Error(err))
return"", err
}

....

return downloadURL, nil
}

以上就是实现巡检的主要代码。

最后

大部分企业虽然都有监控告警,但是自动化巡检在日常的运维工作中还是必要的,它可以聚合目前系统、集群存在的问题,避免遗漏告警信息。另外,在AI发展迅猛的今天,可以把AI也结合到自动化巡检中,比如在巡检中增加一些AI预测,AI故障诊断、AI根因分析等功能。



最后,求关注。如果你还想看更多优质原创文章,欢迎关注我们的公众号「运维开发故事」。

如果我的文章对你有所帮助,还请帮忙一下,你的支持会激励我输出更高质量的文章,非常感谢!

你还可以把我的公众号设为「星标」,这样当公众号文章更新时,你会在第一时间收到推送消息,避免错过我的文章更新。






控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言