pve-exporter/collector/collector.go
Davíð Steinn Geirsson af71e7d729 feat: add collector framework with registry and parallel scrape orchestration
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-20 11:26:55 +00:00

158 lines
4.3 KiB
Go

package collector
import (
"encoding/json"
"log/slog"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
)
const namespace = "pve"
var (
scrapeDurationDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "scrape", "collector_duration_seconds"),
"pve_exporter: Duration of a collector scrape.",
[]string{"collector"},
nil,
)
scrapeSuccessDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, "scrape", "collector_success"),
"pve_exporter: Whether a collector succeeded.",
[]string{"collector"},
nil,
)
)
// Collector is the interface a collector has to implement.
type Collector interface {
Update(client *Client, ch chan<- prometheus.Metric) error
}
// NodeAwareCollector is a collector that needs to know the list of cluster nodes.
type NodeAwareCollector interface {
Collector
SetNodes(nodes []string)
}
// ResourceAwareCollector is a collector that receives the full /cluster/resources response.
type ResourceAwareCollector interface {
Collector
SetResources(data []byte)
}
var (
factories = make(map[string]func(logger *slog.Logger) Collector)
)
func registerCollector(name string, factory func(logger *slog.Logger) Collector) {
factories[name] = factory
}
// PVECollector implements prometheus.Collector and orchestrates all registered collectors.
type PVECollector struct {
client *Client
collectors map[string]Collector
logger *slog.Logger
}
// NewPVECollector creates a new PVECollector with all registered collectors.
func NewPVECollector(client *Client, logger *slog.Logger) *PVECollector {
collectors := make(map[string]Collector)
for name, factory := range factories {
collectors[name] = factory(logger.With("collector", name))
}
return &PVECollector{
client: client,
collectors: collectors,
logger: logger,
}
}
// Describe implements prometheus.Collector.
func (p *PVECollector) Describe(ch chan<- *prometheus.Desc) {
ch <- scrapeDurationDesc
ch <- scrapeSuccessDesc
}
// Collect implements prometheus.Collector.
func (p *PVECollector) Collect(ch chan<- prometheus.Metric) {
// Fetch /cluster/resources once for all collectors.
resourceData, err := p.client.Get("/cluster/resources")
if err != nil {
p.logger.Error("failed to fetch /cluster/resources", "err", err)
// Emit failure for all collectors.
for name := range p.collectors {
ch <- prometheus.MustNewConstMetric(scrapeDurationDesc, prometheus.GaugeValue, 0, name)
ch <- prometheus.MustNewConstMetric(scrapeSuccessDesc, prometheus.GaugeValue, 0, name)
}
return
}
// Extract node names from the resource data.
nodeNames := extractNodeNames(resourceData)
// Distribute data to aware collectors.
for _, c := range p.collectors {
if rac, ok := c.(ResourceAwareCollector); ok {
rac.SetResources(resourceData)
}
if nac, ok := c.(NodeAwareCollector); ok {
nac.SetNodes(nodeNames)
}
}
// Run all collectors in parallel.
wg := sync.WaitGroup{}
wg.Add(len(p.collectors))
for name, c := range p.collectors {
go func(name string, c Collector) {
defer wg.Done()
begin := time.Now()
err := c.Update(p.client, ch)
duration := time.Since(begin)
var success float64
if err != nil {
p.logger.Error("collector failed", "name", name, "duration_seconds", duration.Seconds(), "err", err)
success = 0
} else {
p.logger.Debug("collector succeeded", "name", name, "duration_seconds", duration.Seconds())
success = 1
}
ch <- prometheus.MustNewConstMetric(scrapeDurationDesc, prometheus.GaugeValue, duration.Seconds(), name)
ch <- prometheus.MustNewConstMetric(scrapeSuccessDesc, prometheus.GaugeValue, success, name)
}(name, c)
}
wg.Wait()
}
// clusterResourcesResponse represents the top-level PVE API response.
type clusterResourcesResponse struct {
Data []clusterResourceEntry `json:"data"`
}
type clusterResourceEntry struct {
Type string `json:"type"`
Node string `json:"node"`
}
// extractNodeNames returns the unique node names from a /cluster/resources response.
func extractNodeNames(data []byte) []string {
var resp clusterResourcesResponse
if err := json.Unmarshal(data, &resp); err != nil {
return nil
}
seen := make(map[string]bool)
var nodes []string
for _, entry := range resp.Data {
if entry.Type == "node" && entry.Node != "" && !seen[entry.Node] {
seen[entry.Node] = true
nodes = append(nodes, entry.Node)
}
}
return nodes
}