pve-exporter/collector/replication.go
Davíð Steinn Geirsson 3bafb67aa0 feat: add replication collector (6 replication metrics)
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-20 11:36:54 +00:00

151 lines
4.1 KiB
Go

package collector
import (
"encoding/json"
"fmt"
"log/slog"
"strconv"
"sync"
"github.com/prometheus/client_golang/prometheus"
)
func init() {
registerCollector("replication", func(logger *slog.Logger) Collector {
return newReplicationCollector(logger)
})
}
type replicationCollector struct {
logger *slog.Logger
mu sync.Mutex
nodes []string
}
func newReplicationCollector(logger *slog.Logger) *replicationCollector {
return &replicationCollector{logger: logger}
}
func (c *replicationCollector) SetNodes(nodes []string) {
c.mu.Lock()
defer c.mu.Unlock()
c.nodes = nodes
}
type replicationResponse struct {
Data []replicationEntry `json:"data"`
}
type replicationEntry struct {
ID string `json:"id"`
Type string `json:"type"`
Source string `json:"source"`
Target string `json:"target"`
Guest int `json:"guest"`
Duration float64 `json:"duration"`
LastSync float64 `json:"last_sync"`
LastTry float64 `json:"last_try"`
NextSync float64 `json:"next_sync"`
FailCount float64 `json:"fail_count"`
}
var (
replicationInfoDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "replication", "info"),
"Replication job information.",
[]string{"id", "type", "source", "target", "guest"},
nil,
)
replicationDurationDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "replication", "duration_seconds"),
"Duration of the last replication in seconds.",
[]string{"id"},
nil,
)
replicationLastSyncDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "replication", "last_sync_timestamp_seconds"),
"Timestamp of the last successful replication sync.",
[]string{"id"},
nil,
)
replicationLastTryDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "replication", "last_try_timestamp_seconds"),
"Timestamp of the last replication attempt.",
[]string{"id"},
nil,
)
replicationNextSyncDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "replication", "next_sync_timestamp_seconds"),
"Timestamp of the next scheduled replication sync.",
[]string{"id"},
nil,
)
replicationFailedSyncsDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "replication", "failed_syncs"),
"Number of failed replication syncs.",
[]string{"id"},
nil,
)
)
func (c *replicationCollector) Update(client *Client, ch chan<- prometheus.Metric) error {
c.mu.Lock()
nodes := make([]string, len(c.nodes))
copy(nodes, c.nodes)
c.mu.Unlock()
var (
wg sync.WaitGroup
errs []error
emu sync.Mutex
)
sem := make(chan struct{}, client.MaxConcurrent())
for _, node := range nodes {
wg.Add(1)
go func(node string) {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
if err := c.collectNode(client, ch, node); err != nil {
emu.Lock()
errs = append(errs, err)
emu.Unlock()
}
}(node)
}
wg.Wait()
if len(errs) > 0 {
return fmt.Errorf("replication collection errors: %v", errs)
}
return nil
}
func (c *replicationCollector) collectNode(client *Client, ch chan<- prometheus.Metric, node string) error {
body, err := client.Get(fmt.Sprintf("/nodes/%s/replication", node))
if err != nil {
return fmt.Errorf("failed to get replication for node %s: %w", node, err)
}
var resp replicationResponse
if err := json.Unmarshal(body, &resp); err != nil {
return fmt.Errorf("failed to parse replication response for node %s: %w", node, err)
}
for _, entry := range resp.Data {
guest := strconv.Itoa(entry.Guest)
ch <- prometheus.MustNewConstMetric(replicationInfoDesc, prometheus.GaugeValue, 1,
entry.ID, entry.Type, entry.Source, entry.Target, guest)
ch <- prometheus.MustNewConstMetric(replicationDurationDesc, prometheus.GaugeValue, entry.Duration, entry.ID)
ch <- prometheus.MustNewConstMetric(replicationLastSyncDesc, prometheus.GaugeValue, entry.LastSync, entry.ID)
ch <- prometheus.MustNewConstMetric(replicationLastTryDesc, prometheus.GaugeValue, entry.LastTry, entry.ID)
ch <- prometheus.MustNewConstMetric(replicationNextSyncDesc, prometheus.GaugeValue, entry.NextSync, entry.ID)
ch <- prometheus.MustNewConstMetric(replicationFailedSyncsDesc, prometheus.GaugeValue, entry.FailCount, entry.ID)
}
return nil
}