From bdce7f3d0a322aae52100078719d15f44431709e Mon Sep 17 00:00:00 2001 From: Sunil Thaha Date: Mon, 23 Jun 2025 11:17:38 -0400 Subject: [PATCH] feat(device/cpu): aggregate multi-socket zones into single zone This commit implements `AggregatedZone` to consolidate multiple EnergyZones with same name (e.g., package zones in multi-socket systems) into single zone. The aggregation also handles counter wrapping and is transparent to the caller. This enables power attribution across multi-socket systems while maintaining compatibility with single-socket deployments. Additionally it also solves the zone label which used to have the index suffix as a hack. Key changes: - New `AggregatedZone` type that sums energy from multiple zones of the same type - Proper handling of counter wrapping at MaxEnergy - Thread-safe energy aggregation with overflow protection Signed-off-by: Sunil Thaha --- internal/device/energy_zone.go | 154 +++++ internal/device/energy_zone_test.go | 613 ++++++++++++++++++ internal/device/fake_cpu_power_meter.go | 10 - internal/device/rapl_sysfs_power_meter.go | 53 +- .../device/rapl_sysfs_power_meter_test.go | 122 +++- .../prometheus/collector/power_collector.go | 10 +- .../power_collector_concurrency_test.go | 25 +- .../collector/power_collector_test.go | 10 +- 8 files changed, 948 insertions(+), 49 deletions(-) create mode 100644 internal/device/energy_zone.go create mode 100644 internal/device/energy_zone_test.go diff --git a/internal/device/energy_zone.go b/internal/device/energy_zone.go new file mode 100644 index 0000000000..a773194979 --- /dev/null +++ b/internal/device/energy_zone.go @@ -0,0 +1,154 @@ +// SPDX-FileCopyrightText: 2025 The Kepler Authors +// SPDX-License-Identifier: Apache-2.0 + +package device + +import ( + "fmt" + "math" + "sync" +) + +type Zone = string + +const ( + ZonePackage Zone = "package" + ZoneCore Zone = "core" + ZoneDRAM Zone = "dram" + ZoneUncore Zone = "uncore" + ZonePSys Zone = "psys" + ZonePP0 Zone = "pp0" // Power Plane 0 - processor cores + ZonePP1 Zone = "pp1" // Power Plane 1 - uncore (e.g., integrated GPU) +) + +// zoneKey uniquely identifies a zone by name and index +type zoneKey struct { + name string + index int +} + +// AggregatedZone implements EnergyZone interface by aggregating multiple zones +// of the same type (e.g., multiple package zones in multi-socket systems). +// It handles energy counter wrapping for each individual zone and provides +// a single consolidated energy reading. +type AggregatedZone struct { + name string + index int + zones []EnergyZone + lastReadings map[zoneKey]Energy + currentEnergy Energy // Aggregated energy counter + maxEnergy Energy // Cached sum of all zone MaxEnergy values + mu sync.RWMutex +} + +// NewAggregatedZone creates a new AggregatedZone for zones of the same type +// The name is taken from the first zone +// Panics if zones is empty or nil +func NewAggregatedZone(zones []EnergyZone) *AggregatedZone { + // Panic on invalid inputs + if len(zones) == 0 { + panic("NewAggregatedZone: zones cannot be empty") + } + + // Use the first zone's name as the aggregated zone name + name := zones[0].Name() + // Calculate and cache the combined MaxEnergy during construction + // Check for overflow when summing MaxEnergy values + var totalMax Energy + for _, zone := range zones { + zoneMax := zone.MaxEnergy() + // Check for overflow before adding + if totalMax > 0 && zoneMax > math.MaxUint64-totalMax { + // Overflow would occur, use MaxUint64 as safe maximum + totalMax = Energy(math.MaxUint64) + break + } + totalMax += zoneMax + } + + return &AggregatedZone{ + name: name, + index: -1, // Indicates this is an aggregated zone + zones: zones, + lastReadings: make(map[zoneKey]Energy), + currentEnergy: 0, + maxEnergy: totalMax, // Cache the combined MaxEnergy + } +} + +// Name returns the zone name +func (az *AggregatedZone) Name() string { + return az.name +} + +// Index returns the zone index (-1 for aggregated zones) +func (az *AggregatedZone) Index() int { + return az.index +} + +// Path returns path for the aggregated zone +func (az *AggregatedZone) Path() string { + // TODO: decide if all the paths should be returned + return fmt.Sprintf("aggregated-%s", az.name) +} + +// Energy returns the total energy consumption across all aggregated zones, +// handling wrap-around for each individual zone +func (az *AggregatedZone) Energy() (Energy, error) { + az.mu.Lock() + defer az.mu.Unlock() + + var totalDelta Energy + + for _, zone := range az.zones { + currentReading, err := zone.Energy() + if err != nil { + return 0, fmt.Errorf("no valid energy readings from aggregated zones - %s: %w", zone.Name(), err) + } + + zoneID := zoneKey{zone.Name(), zone.Index()} + + if lastReading, exists := az.lastReadings[zoneID]; exists { + + // Calculate delta since last reading + var delta Energy + if currentReading >= lastReading { + // Normal case: no wrap + delta = currentReading - lastReading + } else { + // Wrap occurred: calculate delta across wrap boundary + // Only if zone has valid MaxEnergy (> 0) + if zone.MaxEnergy() > 0 { + delta = (zone.MaxEnergy() - lastReading) + currentReading + } else { + // Invalid MaxEnergy, treat as normal delta (might be negative) + delta = currentReading - lastReading + } + } + totalDelta += delta + } else { + // First reading: use current reading as initial energy + totalDelta += currentReading + } + + // Update last reading + az.lastReadings[zoneID] = currentReading + } + + // Update aggregated energy counter + az.currentEnergy += totalDelta + + // Wrap at maxEnergy boundary to match hardware counter behavior + // This is required for the power attribution algorithm's calculateEnergyDelta() + if az.maxEnergy > 0 { + az.currentEnergy %= az.maxEnergy + } + + return az.currentEnergy, nil +} + +// MaxEnergy returns the cached sum of maximum energy values across all zones +// This provides the correct wrap boundary for delta calculations +func (az *AggregatedZone) MaxEnergy() Energy { + return az.maxEnergy +} diff --git a/internal/device/energy_zone_test.go b/internal/device/energy_zone_test.go new file mode 100644 index 0000000000..c0bdbc9e42 --- /dev/null +++ b/internal/device/energy_zone_test.go @@ -0,0 +1,613 @@ +// SPDX-FileCopyrightText: 2025 The Kepler Authors +// SPDX-License-Identifier: Apache-2.0 + +package device + +import ( + "fmt" + "math" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// mockEnergyZone implements EnergyZone for testing +type mockEnergyZone struct { + name string + index int + path string + energy Energy + maxEnergy Energy + err error + mu sync.RWMutex +} + +func (m *mockEnergyZone) Name() string { return m.name } +func (m *mockEnergyZone) Index() int { return m.index } +func (m *mockEnergyZone) Path() string { return m.path } +func (m *mockEnergyZone) Energy() (Energy, error) { + m.mu.RLock() + defer m.mu.RUnlock() + return m.energy, m.err +} +func (m *mockEnergyZone) MaxEnergy() Energy { return m.maxEnergy } + +// SetEnergy safely updates the energy value for testing +func (m *mockEnergyZone) SetEnergy(energy Energy) { + m.mu.Lock() + defer m.mu.Unlock() + m.energy = energy +} + +// TestNewAggregatedZone tests constructor validation and basic properties +func TestNewAggregatedZone(t *testing.T) { + zones := []EnergyZone{ + &mockEnergyZone{name: "package", index: 0}, + &mockEnergyZone{name: "package", index: 1}, + } + + az := NewAggregatedZone(zones) + + assert.Equal(t, "package", az.Name()) + assert.Equal(t, -1, az.Index()) + assert.Equal(t, "aggregated-package", az.Path()) + assert.Len(t, az.zones, 2) + assert.NotNil(t, az.lastReadings) + + // Test panic on empty zones + assert.Panics(t, func() { + NewAggregatedZone([]EnergyZone{}) + }) +} + +// TestAggregatedZone_EnergyAggregation tests core energy aggregation functionality +func TestAggregatedZone_EnergyAggregation(t *testing.T) { + t.Run("BasicAggregation", func(t *testing.T) { + zones := []EnergyZone{ + &mockEnergyZone{name: "package", index: 0, energy: 100, maxEnergy: 1000}, + &mockEnergyZone{name: "package", index: 1, energy: 200, maxEnergy: 1000}, + } + + az := NewAggregatedZone(zones) + + energy, err := az.Energy() + require.NoError(t, err) + assert.Equal(t, Energy(300), energy) // 100 + 200 + assert.Equal(t, Energy(2000), az.MaxEnergy()) // 1000 + 1000 + }) + + t.Run("SingleZone", func(t *testing.T) { + zones := []EnergyZone{ + &mockEnergyZone{name: "package", index: 0, energy: 150, maxEnergy: 1000}, + } + + az := NewAggregatedZone(zones) + + energy, err := az.Energy() + require.NoError(t, err) + assert.Equal(t, Energy(150), energy) + }) + + t.Run("FirstReadingCorrectness", func(t *testing.T) { + // Test that first reading doesn't double-count + zone := &mockEnergyZone{name: "package", index: 0, energy: 100, maxEnergy: 1000} + zones := []EnergyZone{zone} + + az := NewAggregatedZone(zones) + + // First reading should return current energy + energy1, err := az.Energy() + require.NoError(t, err) + assert.Equal(t, Energy(100), energy1) + + // Second reading with same energy should still return 100 (no delta) + energy2, err := az.Energy() + require.NoError(t, err) + assert.Equal(t, Energy(100), energy2) + + // Third reading with increased energy should show the increase + zone.SetEnergy(150) + energy3, err := az.Energy() + require.NoError(t, err) + assert.Equal(t, Energy(150), energy3) + }) +} + +// TestAggregatedZone_WrapHandling tests hardware counter wrapping scenarios +func TestAggregatedZone_WrapHandling(t *testing.T) { + t.Run("MultiZoneWrap", func(t *testing.T) { + zone0 := &mockEnergyZone{name: "package", index: 0, energy: 900, maxEnergy: 1000} + zone1 := &mockEnergyZone{name: "package", index: 1, energy: 800, maxEnergy: 1000} + zones := []EnergyZone{zone0, zone1} + + az := NewAggregatedZone(zones) + + // First reading: 900 + 800 = 1700 + energy1, err := az.Energy() + require.NoError(t, err) + assert.Equal(t, Energy(1700), energy1) + + // Simulate wrap on zone0: 900 -> 100, zone1: 800 -> 850 + zone0.SetEnergy(100) // wrapped: delta = (1000-900) + 100 = 200 + zone1.SetEnergy(850) // normal: delta = 50 + + energy2, err := az.Energy() + require.NoError(t, err) + assert.Equal(t, Energy(1950), energy2) // 1700 + 200 + 50 + }) + + t.Run("MultipleWraps", func(t *testing.T) { + zone := &mockEnergyZone{name: "package", index: 0, energy: 900, maxEnergy: 1000} + zones := []EnergyZone{zone} + + az := NewAggregatedZone(zones) + + // First reading: 900 + energy1, err := az.Energy() + require.NoError(t, err) + assert.Equal(t, Energy(900), energy1) + + // First wrap: 900 -> 100 (delta: 200) + // Total: 900 + 200 = 1100, wraps to 100 (1100 % 1000 = 100) + zone.SetEnergy(100) + energy2, err := az.Energy() + require.NoError(t, err) + assert.Equal(t, Energy(100), energy2) // 1100 % 1000 = 100 + + // Second wrap: 100 -> 50 (delta: 950) + // Total: 100 + 950 = 1050, wraps to 50 (1050 % 1000 = 50) + zone.SetEnergy(50) + energy3, err := az.Energy() + require.NoError(t, err) + assert.Equal(t, Energy(50), energy3) // 1050 % 1000 = 50 + }) + + t.Run("BackwardReading", func(t *testing.T) { + // Test handling of readings that go backward (faulty hardware) + zone := &mockEnergyZone{name: "package", index: 0, energy: 500, maxEnergy: 1000} + zones := []EnergyZone{zone} + + az := NewAggregatedZone(zones) + + energy1, err := az.Energy() + require.NoError(t, err) + assert.Equal(t, Energy(500), energy1) + + // Simulate backward reading (treated as wrap) + zone.SetEnergy(400) + energy2, err := az.Energy() + require.NoError(t, err) + // Treated as wrap: (1000-500) + 400 = 900, total = 500 + 900 = 1400 + // Wraps: 1400 % 1000 = 400 + assert.Equal(t, Energy(400), energy2) + }) + + t.Run("ZeroMaxEnergyHandling", func(t *testing.T) { + // Test safe handling of zero MaxEnergy + zones := []EnergyZone{ + &mockEnergyZone{name: "package", index: 0, energy: 100, maxEnergy: 0}, + &mockEnergyZone{name: "package", index: 1, energy: 200, maxEnergy: 0}, + } + + az := NewAggregatedZone(zones) + assert.Equal(t, Energy(0), az.MaxEnergy()) + + // Should not panic with zero MaxEnergy + energy, err := az.Energy() + require.NoError(t, err) + assert.Equal(t, Energy(300), energy) + }) +} + +// TestAggregatedZone_ErrorHandling tests error conditions +func TestAggregatedZone_ErrorHandling(t *testing.T) { + t.Run("SingleZoneError", func(t *testing.T) { + zones := []EnergyZone{ + &mockEnergyZone{name: "package", index: 0, energy: 100, maxEnergy: 1000, err: fmt.Errorf("read error")}, + &mockEnergyZone{name: "package", index: 1, energy: 200, maxEnergy: 1000}, + } + + az := NewAggregatedZone(zones) + + energy, err := az.Energy() + assert.Error(t, err) + assert.Contains(t, err.Error(), "no valid energy readings") + assert.Zero(t, energy) + }) + + t.Run("AllZonesError", func(t *testing.T) { + zones := []EnergyZone{ + &mockEnergyZone{name: "package", index: 0, energy: 100, maxEnergy: 1000, err: fmt.Errorf("error1")}, + &mockEnergyZone{name: "package", index: 1, energy: 200, maxEnergy: 1000, err: fmt.Errorf("error2")}, + } + + az := NewAggregatedZone(zones) + + energy, err := az.Energy() + assert.Error(t, err) + assert.Contains(t, err.Error(), "no valid energy readings") + assert.Zero(t, energy) + }) +} + +// TestAggregatedZone_MaxEnergyHandling tests MaxEnergy calculation and edge cases +func TestAggregatedZone_MaxEnergyHandling(t *testing.T) { + t.Run("BasicMaxEnergy", func(t *testing.T) { + zones := []EnergyZone{ + &mockEnergyZone{name: "package", index: 0, maxEnergy: 1000}, + &mockEnergyZone{name: "package", index: 1, maxEnergy: 1000}, + } + + az := NewAggregatedZone(zones) + assert.Equal(t, Energy(2000), az.MaxEnergy()) + }) + + t.Run("MaxEnergyCaching", func(t *testing.T) { + zones := []EnergyZone{ + &mockEnergyZone{name: "package", index: 0, energy: 100, maxEnergy: 1000}, + &mockEnergyZone{name: "package", index: 1, energy: 200, maxEnergy: 1000}, + &mockEnergyZone{name: "package", index: 2, energy: 300, maxEnergy: 1000}, + } + + az := NewAggregatedZone(zones) + expectedMaxEnergy := Energy(3000) + + // Multiple calls should return the same cached value + assert.Equal(t, expectedMaxEnergy, az.MaxEnergy()) + assert.Equal(t, expectedMaxEnergy, az.MaxEnergy()) + assert.Equal(t, expectedMaxEnergy, az.MaxEnergy()) + + // Verify cached value is used after Energy() calls + _, err := az.Energy() + require.NoError(t, err) + assert.Equal(t, expectedMaxEnergy, az.MaxEnergy()) + }) + + t.Run("MaxEnergyOverflow", func(t *testing.T) { + // Test overflow protection in MaxEnergy calculation + largeMaxEnergy := Energy(math.MaxUint64 / 2) + zones := []EnergyZone{ + &mockEnergyZone{name: "package", index: 0, energy: 100, maxEnergy: largeMaxEnergy}, + &mockEnergyZone{name: "package", index: 1, energy: 200, maxEnergy: largeMaxEnergy}, + &mockEnergyZone{name: "package", index: 2, energy: 300, maxEnergy: largeMaxEnergy}, + } + + az := NewAggregatedZone(zones) + + // Should not panic and should handle overflow gracefully + maxEnergy := az.MaxEnergy() + t.Logf("MaxEnergy with potential overflow: %d", maxEnergy) + + assert.NotPanics(t, func() { + _, _ = az.Energy() + }) + }) +} + +// TestAggregatedZone_LargeValueHandling tests behavior with large Energy values +func TestAggregatedZone_LargeValueHandling(t *testing.T) { + t.Run("MaximumValues", func(t *testing.T) { + // Test with very large Energy values + const maxUint64 = ^uint64(0) + maxEnergy := Energy(maxUint64) + + zone0Energy := Energy(maxUint64/2 - 1000) + zone1Energy := Energy(maxUint64/2 - 2000) + + zone0 := &mockEnergyZone{ + name: "package", index: 0, + energy: zone0Energy, + maxEnergy: maxEnergy, + } + zone1 := &mockEnergyZone{ + name: "package", index: 1, + energy: zone1Energy, + maxEnergy: maxEnergy, + } + zones := []EnergyZone{zone0, zone1} + + az := NewAggregatedZone(zones) + + // Should handle large values without overflow + energy1, err := az.Energy() + require.NoError(t, err) + expected1 := zone0Energy + zone1Energy + assert.Equal(t, expected1, energy1) + + // Test incremental updates + zone0.SetEnergy(zone0Energy + 500) + zone1.SetEnergy(zone1Energy + 500) + energy2, err := az.Energy() + require.NoError(t, err) + expected2 := expected1 + 1000 + assert.Equal(t, expected2, energy2) + }) + + t.Run("LargeValueWrapping", func(t *testing.T) { + const maxEnergy = Energy(1000000) + + zone := &mockEnergyZone{ + name: "package", index: 0, + energy: Energy(maxEnergy - 100), + maxEnergy: maxEnergy, + } + zones := []EnergyZone{zone} + + az := NewAggregatedZone(zones) + + energy1, err := az.Energy() + require.NoError(t, err) + assert.Equal(t, Energy(maxEnergy-100), energy1) + + // Simulate wrap: (maxEnergy-100) -> 50 + zone.SetEnergy(50) + energy2, err := az.Energy() + require.NoError(t, err) + // Delta: (maxEnergy - (maxEnergy-100)) + 50 = 150 + // Total: (maxEnergy-100) + 150 = maxEnergy + 50 + // Wraps: (maxEnergy + 50) % maxEnergy = 50 + expected := Energy(50) + assert.Equal(t, expected, energy2) + }) + + t.Run("DeltaOverflow", func(t *testing.T) { + // Test potential overflow in delta calculations + largeMaxEnergy := Energy(math.MaxUint64 - 1000) + zone := &mockEnergyZone{ + name: "package", index: 0, + energy: Energy(1000), + maxEnergy: largeMaxEnergy, + } + zones := []EnergyZone{zone} + + az := NewAggregatedZone(zones) + + energy1, err := az.Energy() + require.NoError(t, err) + assert.Equal(t, Energy(1000), energy1) + + // Simulate wrap that could cause overflow in delta calculation + zone.SetEnergy(100) + energy2, err := az.Energy() + + // Should not panic, but result might be large due to wrap calculation + if err != nil { + t.Logf("Error on potential overflow: %v", err) + } else { + t.Logf("Energy after potential overflow: %d", energy2) + } + + assert.NotPanics(t, func() { + _, _ = az.Energy() + }) + }) +} + +// TestAggregatedZone_ConcurrentAccess tests thread safety and concurrent access +func TestAggregatedZone_ConcurrentAccess(t *testing.T) { + t.Run("BasicConcurrentReads", func(t *testing.T) { + zones := []EnergyZone{ + &mockEnergyZone{name: "package", index: 0, energy: 100, maxEnergy: 1000}, + &mockEnergyZone{name: "package", index: 1, energy: 200, maxEnergy: 1000}, + } + + az := NewAggregatedZone(zones) + + // Test concurrent reads don't cause race conditions + done := make(chan bool, 10) + for i := 0; i < 10; i++ { + go func() { + _, err := az.Energy() + assert.NoError(t, err) + done <- true + }() + } + + for i := 0; i < 10; i++ { + <-done + } + }) + + t.Run("ConcurrentReadsWithUpdates", func(t *testing.T) { + zone := &mockEnergyZone{name: "package", index: 0, energy: 100, maxEnergy: 1000} + zones := []EnergyZone{zone} + + az := NewAggregatedZone(zones) + + // Initialize state + _, err := az.Energy() + require.NoError(t, err) + + // Concurrent access with zone energy updates + done := make(chan bool, 20) + results := make(chan Energy, 20) + + // Goroutines that update and read + for i := 0; i < 10; i++ { + go func(energyValue Energy) { + zone.SetEnergy(energyValue) + energy, err := az.Energy() + if err == nil { + results <- energy + } + done <- true + }(Energy(100 + i*50)) + } + + // Goroutines that just read + for i := 0; i < 10; i++ { + go func() { + energy, err := az.Energy() + if err == nil { + results <- energy + } + done <- true + }() + } + + // Wait for all goroutines + for i := 0; i < 20; i++ { + <-done + } + close(results) + + // Collect results + var energyValues []Energy + for energy := range results { + energyValues = append(energyValues, energy) + } + + t.Logf("Concurrent energy readings: %v", energyValues) + assert.NotEmpty(t, energyValues) + assert.NotPanics(t, func() { + _, _ = az.Energy() + }) + }) +} + +// TestAggregatedZone_StateManagement tests internal state tracking +func TestAggregatedZone_StateManagement(t *testing.T) { + t.Run("BasicStateTracking", func(t *testing.T) { + zone := &mockEnergyZone{name: "package", index: 0, energy: 500, maxEnergy: 1000} + zones := []EnergyZone{zone} + + az := NewAggregatedZone(zones) + + // First reading should initialize last reading + _, err := az.Energy() + require.NoError(t, err) + + zoneID := zoneKey{"package", 0} + lastReading := az.lastReadings[zoneID] + assert.Equal(t, Energy(500), lastReading) + + // Update energy and verify state tracking + zone.SetEnergy(600) + _, err = az.Energy() + require.NoError(t, err) + + lastReading = az.lastReadings[zoneID] + assert.Equal(t, Energy(600), lastReading) + }) + + t.Run("ZoneIDGeneration", func(t *testing.T) { + zone := &mockEnergyZone{name: "package", index: 0, energy: 100, maxEnergy: 1000} + zones := []EnergyZone{zone} + + az := NewAggregatedZone(zones) + + // First call should create last reading entry + _, err := az.Energy() + require.NoError(t, err) + assert.Len(t, az.lastReadings, 1) + + // Verify zone ID format + expectedZoneID := zoneKey{"package", 0} + _, exists := az.lastReadings[expectedZoneID] + assert.True(t, exists, "Expected zone ID %v to exist", expectedZoneID) + }) + + t.Run("ConcurrentStateInitialization", func(t *testing.T) { + zones := []EnergyZone{ + &mockEnergyZone{name: "package", index: 0, energy: 100, maxEnergy: 1000}, + &mockEnergyZone{name: "package", index: 1, energy: 200, maxEnergy: 1000}, + } + + az := NewAggregatedZone(zones) + + // Concurrent first-time access + done := make(chan bool, 10) + for i := 0; i < 10; i++ { + go func() { + defer func() { done <- true }() + _, err := az.Energy() + assert.NoError(t, err) + }() + } + + // Wait for all goroutines + for i := 0; i < 10; i++ { + <-done + } + + // Should have exactly 2 last readings initialized + assert.Len(t, az.lastReadings, 2) + + // Verify both zone IDs exist + zoneKey0 := zoneKey{"package", 0} + zoneKey1 := zoneKey{"package", 1} + + _, exists0 := az.lastReadings[zoneKey0] + _, exists1 := az.lastReadings[zoneKey1] + + assert.True(t, exists0) + assert.True(t, exists1) + }) +} + +// TestAggregatedZone_EdgeCases tests edge cases and unusual scenarios +func TestAggregatedZone_EdgeCases(t *testing.T) { + t.Run("InconsistentZoneState", func(t *testing.T) { + // Test handling when zones exceed their own MaxEnergy + zones := []EnergyZone{ + &mockEnergyZone{name: "package", index: 0, energy: 900, maxEnergy: 1000}, + &mockEnergyZone{name: "package", index: 1, energy: 900, maxEnergy: 1000}, + } + + az := NewAggregatedZone(zones) + + energy1, err := az.Energy() + require.NoError(t, err) + assert.Equal(t, Energy(1800), energy1) + + // Set zones to values that exceed their MaxEnergy (inconsistent state) + zones[0].(*mockEnergyZone).SetEnergy(1100) + zones[1].(*mockEnergyZone).SetEnergy(1100) + + energy2, err := az.Energy() + require.NoError(t, err) + + // Should handle gracefully without panicking + t.Logf("Energy with inconsistent state: %d", energy2) + assert.NotPanics(t, func() { + _, _ = az.Energy() + }) + }) + + t.Run("OverflowProtection", func(t *testing.T) { + // Test protection against various overflow scenarios + const largeMax = Energy(^uint64(0) >> 1) + + zone0 := &mockEnergyZone{ + name: "package", index: 0, + energy: largeMax - 1000, + maxEnergy: largeMax, + } + zone1 := &mockEnergyZone{ + name: "package", index: 1, + energy: largeMax - 2000, + maxEnergy: largeMax, + } + zones := []EnergyZone{zone0, zone1} + + az := NewAggregatedZone(zones) + + // Should work without overflow + energy1, err := az.Energy() + require.NoError(t, err) + + rawTotal := (largeMax - 1000) + (largeMax - 2000) + assert.Equal(t, rawTotal, energy1) + + // Test incremental updates + zone0.SetEnergy(largeMax - 500) + zone1.SetEnergy(largeMax - 1500) + energy2, err := az.Energy() + require.NoError(t, err) + + expected2 := energy1 + 1000 // Total delta should be 1000 + assert.Equal(t, expected2, energy2) + }) +} diff --git a/internal/device/fake_cpu_power_meter.go b/internal/device/fake_cpu_power_meter.go index fb41653e8b..1a98fee090 100644 --- a/internal/device/fake_cpu_power_meter.go +++ b/internal/device/fake_cpu_power_meter.go @@ -12,16 +12,6 @@ import ( ) // NOTE: This fake meter is not intended to be used in production and is for testing only - -type Zone = string - -const ( - ZonePackage Zone = "package" - ZoneCore Zone = "core" - ZoneDRAM Zone = "dram" - ZoneUncore Zone = "uncore" -) - var defaultFakeZones = []Zone{ZonePackage, ZoneCore, ZoneDRAM} const defaultRaplPath = "/sys/class/powercap/intel-rapl" diff --git a/internal/device/rapl_sysfs_power_meter.go b/internal/device/rapl_sysfs_power_meter.go index e1427223ae..25cb6082bf 100644 --- a/internal/device/rapl_sysfs_power_meter.go +++ b/internal/device/rapl_sysfs_power_meter.go @@ -134,10 +134,10 @@ func (r *raplPowerMeter) Zones() ([]EnergyZone, error) { } // filter out non-standard zones - stdZoneMap := map[string]EnergyZone{} + + stdZoneMap := map[zoneKey]EnergyZone{} for _, zone := range zones { - // key -> zone-name + index - key := fmt.Sprintf("%s-%d", zone.Name(), zone.Index()) + key := zoneKey{name: zone.Name(), index: zone.Index()} // ignore non-standard zones if a standard zone already exists if existingZone, exists := stdZoneMap[key]; exists && isStandardRaplPath(existingZone.Path()) { @@ -146,13 +146,52 @@ func (r *raplPowerMeter) Zones() ([]EnergyZone, error) { stdZoneMap[key] = zone } - r.cachedZones = make([]EnergyZone, 0, len(stdZoneMap)) - for _, zone := range stdZoneMap { - r.cachedZones = append(r.cachedZones, zone) - } + // Group zones by name for aggregation + r.cachedZones = r.groupZonesByName(stdZoneMap) return r.cachedZones, nil } +// groupZonesByName groups zones by their base name and creates AggregatedZone +// instances when multiple zones share the same name (multi-socket systems) +func (r *raplPowerMeter) groupZonesByName(stdZoneMap map[zoneKey]EnergyZone) []EnergyZone { + // Group zones by base name (e.g., "package", "dram") + zoneGroups := make(map[string][]EnergyZone) + + for key, zone := range stdZoneMap { + zoneGroups[key.name] = append(zoneGroups[key.name], zone) + } + + // Create aggregated zones for duplicates, keep single zones as-is + var result []EnergyZone + for name, zones := range zoneGroups { + if len(zones) == 1 { + // Single zone - use as-is + result = append(result, zones[0]) + continue + + } + + // Multiple zones with same name - create AggregatedZone + aggregated := NewAggregatedZone(zones) + result = append(result, aggregated) + r.logger.Debug("Created aggregated zone", + "name", name, + "zone_count", len(zones), + "zones", r.zoneNames(zones)) + } + + return result +} + +// zoneNames returns a slice of zone names for logging +func (r *raplPowerMeter) zoneNames(zones []EnergyZone) []string { + names := make([]string, len(zones)) + for i, zone := range zones { + names[i] = fmt.Sprintf("%s-%d", zone.Name(), zone.Index()) + } + return names +} + // isStandardRaplPath checks if a RAPL zone path is in the standard format func isStandardRaplPath(path string) bool { return strings.Contains(path, "/intel-rapl:") diff --git a/internal/device/rapl_sysfs_power_meter_test.go b/internal/device/rapl_sysfs_power_meter_test.go index d12f6d8f3b..09b90e7e44 100644 --- a/internal/device/rapl_sysfs_power_meter_test.go +++ b/internal/device/rapl_sysfs_power_meter_test.go @@ -5,12 +5,14 @@ package device import ( "errors" + "log/slog" "strings" "testing" "github.com/prometheus/procfs/sysfs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" ) // TestCPUPowerMeterInterface ensures that raplPowerMeter properly implements the CPUPowerMeter interface @@ -40,7 +42,10 @@ func TestCPUPowerMeter_Init(t *testing.T) { } func TestCPUPowerMeter_Zones(t *testing.T) { - meter := &raplPowerMeter{reader: sysfsRaplReader{fs: validSysFSFixtures(t)}} + meter := &raplPowerMeter{ + reader: sysfsRaplReader{fs: validSysFSFixtures(t)}, + logger: slog.Default().With("service", "rapl"), + } zones, err := meter.Zones() assert.NoError(t, err, "Zones() should not return an error") assert.NotNil(t, zones, "Zones() should return a non-nil slice") @@ -72,7 +77,10 @@ func TestSysFSRaplZoneInterface(t *testing.T) { } func TestSysFSRaplPowerMeterInit(t *testing.T) { - rapl := raplPowerMeter{reader: sysfsRaplReader{fs: validSysFSFixtures(t)}} + rapl := raplPowerMeter{ + reader: sysfsRaplReader{fs: validSysFSFixtures(t)}, + logger: slog.Default().With("service", "rapl"), + } err := rapl.Init() assert.NoError(t, err) } @@ -91,33 +99,129 @@ func TestSysFSRaplPowerMeter(t *testing.T) { assert.Equal(t, 4, len(actualZones), "Expected to find 4 zones in test fixtures") // realRaplReader should filter out non-standard zones - rapl := raplPowerMeter{reader: sysfsRaplReader{fs: fs}} + rapl := raplPowerMeter{ + reader: sysfsRaplReader{fs: fs}, + logger: slog.Default().With("service", "rapl"), + } zones, err := rapl.Zones() // Test that each zone implements the interface correctly assert.NoError(t, err) - assert.Equal(t, 3, len(zones), "find 3 zones in test fixtures after filtering mmio") - assert.Equal(t, []string{"core", "package", "package"}, sortedZoneNames(zones), - "Expected to find expected zones in test fixtures") + // With aggregation: two package zones become one AggregatedZone + one core zone = 2 total + assert.Equal(t, 2, len(zones), "find 2 zones after aggregation (package + core)") + assert.Equal(t, []string{"core", "package"}, sortedZoneNames(zones), + "Expected to find aggregated zones in test fixtures") for _, zone := range zones { assert.NotEmpty(t, zone.Name(), "Zone name should not be empty") assert.NotEmpty(t, zone.Path(), "Zone path should not be empty") assert.GreaterOrEqual(t, zone.MaxEnergy(), 1000.0*Joule, "Max energy should not be negative") - EnergyZone := zone.(sysfsRaplZone) + // Zone could be either sysfsRaplZone or AggregatedZone + switch z := zone.(type) { + case sysfsRaplZone: + // Individual zone + assert.NotNil(t, z) + case *AggregatedZone: + // Aggregated zone + assert.NotNil(t, z) + assert.Equal(t, -1, z.Index(), "AggregatedZone should have index -1") + default: + t.Fatalf("Unexpected zone type: %T", zone) + } - energy, err := EnergyZone.Energy() + // Skip the original assertion since we now support both zone types + _ = zone + + energy, err := zone.Energy() assert.NoError(t, err, zone.Path()) assert.GreaterOrEqual(t, energy, 1000.0*Joule, "Energy should not be negative") } } +func TestAggregatedZoneIntegration(t *testing.T) { + // Test that RAPL reader creates AggregatedZone for multiple zones with same name + mockReader := &mockSysFSReader{ + response: []EnergyZone{ + // Two package zones with same name but different indices and one core zone + mockZone{name: "package", index: 0, path: "/intel-rapl:0", energy: 1000, maxEnergy: 100000}, + mockZone{name: "package", index: 1, path: "/intel-rapl:1", energy: 2000, maxEnergy: 100000}, + mockZone{name: "core", index: 0, path: "/intel-rapl:0:0", energy: 500, maxEnergy: 50000}, + }, + } + + rapl := &raplPowerMeter{ + reader: mockReader, + logger: slog.Default(), + } + + zones, err := rapl.Zones() + require.NoError(t, err) + + // Should have 2 zones: 1 aggregated package zone + 1 core zone + assert.Equal(t, 2, len(zones), "Expected 2 zones after aggregation") + + // Find the package zone - should be AggregatedZone + var packageZone EnergyZone + var coreZone EnergyZone + for _, zone := range zones { + if zone.Name() == "package" { + packageZone = zone + } else if zone.Name() == "core" { // Single zone keeps original name + coreZone = zone + } + } + + // Verify package zone is aggregated + require.NotNil(t, packageZone, "Package zone should exist") + aggregated, isAggregated := packageZone.(*AggregatedZone) + assert.True(t, isAggregated, "Package zone should be AggregatedZone") + assert.Equal(t, "package", aggregated.Name()) + assert.Equal(t, -1, aggregated.Index()) + assert.Equal(t, Energy(200000), aggregated.MaxEnergy()) // Sum of both package zones + + // Verify core zone is not aggregated + require.NotNil(t, coreZone, "Core zone should exist") + _, isNotAggregated := coreZone.(mockZone) + assert.True(t, isNotAggregated, "Core zone should remain as individual zone") + + // Test energy aggregation + packageEnergy, err := packageZone.Energy() + require.NoError(t, err) + assert.Equal(t, Energy(3000), packageEnergy) // 1000 + 2000 from both package zones +} + +type mockZone struct { + name string + index int + path string + energy Energy + maxEnergy Energy +} + +func (m mockZone) Name() string { return m.name } +func (m mockZone) Index() int { return m.index } +func (m mockZone) Path() string { return m.path } +func (m mockZone) Energy() (Energy, error) { return m.energy, nil } +func (m mockZone) MaxEnergy() Energy { return m.maxEnergy } + +type mockSysFSReader struct { + response []EnergyZone + err error +} + +func (m *mockSysFSReader) Zones() ([]EnergyZone, error) { + return m.response, m.err +} + // TestRAPLPowerMeterFromFixtures tests the realRaplReader with filtering using test fixtures func TestRAPLPowerMeterFromFixtures(t *testing.T) { fs := validSysFSFixtures(t) - raplMeter := raplPowerMeter{reader: sysfsRaplReader{fs: fs}} + raplMeter := raplPowerMeter{ + reader: sysfsRaplReader{fs: fs}, + logger: slog.Default().With("service", "rapl"), + } allZones, err := raplMeter.Zones() assert.NoError(t, err) assert.NotEmpty(t, allZones, "Expected to find RAPL zones in test fixtures") diff --git a/internal/exporter/prometheus/collector/power_collector.go b/internal/exporter/prometheus/collector/power_collector.go index 52bc4b34ec..9f8a3ce38f 100644 --- a/internal/exporter/prometheus/collector/power_collector.go +++ b/internal/exporter/prometheus/collector/power_collector.go @@ -255,7 +255,7 @@ func (c *PowerCollector) collectNodeMetrics(ch chan<- prometheus.Metric, node *m ) for zone, energy := range node.Zones { path := zone.Path() - zoneName := fmt.Sprintf("%s-%d", zone.Name(), zone.Index()) + zoneName := zone.Name() // joules ch <- prometheus.MustNewConstMetric( @@ -322,7 +322,7 @@ func (c *PowerCollector) collectProcessMetrics(ch chan<- prometheus.Metric, stat ) for zone, usage := range proc.Zones { - zoneName := fmt.Sprintf("%s-%d", zone.Name(), zone.Index()) + zoneName := zone.Name() ch <- prometheus.MustNewConstMetric( c.processCPUJoulesDescriptor, prometheus.CounterValue, @@ -354,7 +354,7 @@ func (c *PowerCollector) collectContainerMetrics(ch chan<- prometheus.Metric, st // No need to lock, already done by the calling function for id, container := range containers { for zone, usage := range container.Zones { - zoneName := fmt.Sprintf("%s-%d", zone.Name(), zone.Index()) + zoneName := zone.Name() ch <- prometheus.MustNewConstMetric( c.containerCPUJoulesDescriptor, @@ -387,7 +387,7 @@ func (c *PowerCollector) collectVMMetrics(ch chan<- prometheus.Metric, state str // No need to lock, already done by the calling function for id, vm := range vms { for zone, usage := range vm.Zones { - zoneName := fmt.Sprintf("%s-%d", zone.Name(), zone.Index()) + zoneName := zone.Name() ch <- prometheus.MustNewConstMetric( c.vmCPUJoulesDescriptor, prometheus.CounterValue, @@ -416,7 +416,7 @@ func (c *PowerCollector) collectPodMetrics(ch chan<- prometheus.Metric, state st // No need to lock, already done by the calling function for id, pod := range pods { for zone, usage := range pod.Zones { - zoneName := fmt.Sprintf("%s-%d", zone.Name(), zone.Index()) + zoneName := zone.Name() ch <- prometheus.MustNewConstMetric( c.podCPUJoulesDescriptor, prometheus.CounterValue, diff --git a/internal/exporter/prometheus/collector/power_collector_concurrency_test.go b/internal/exporter/prometheus/collector/power_collector_concurrency_test.go index 6a7935b272..4292acccd4 100644 --- a/internal/exporter/prometheus/collector/power_collector_concurrency_test.go +++ b/internal/exporter/prometheus/collector/power_collector_concurrency_test.go @@ -182,34 +182,33 @@ func TestPowerCollectorWithRegistry(t *testing.T) { switch mf.GetName() { case "kepler_node_cpu_joules_total": // Main joules metric - no mode label - assertMainMetricValue(t, mf, "package-0", nodePkgAbs.Joules()) - assertMainMetricValue(t, mf, "package-1", nodePkgAbs.Joules()) - assertMainMetricValue(t, mf, "dram-0", nodeDramAbs.Joules()) + assertMainMetricValue(t, mf, "package", nodePkgAbs.Joules()) + assertMainMetricValue(t, mf, "dram", nodeDramAbs.Joules()) case "kepler_node_cpu_watts": // Main watts metric - no mode label - assertMainMetricValue(t, mf, "package-0", nodePkgPower.Watts()) - assertMainMetricValue(t, mf, "dram-0", nodeDramPower.Watts()) + assertMainMetricValue(t, mf, "package", nodePkgPower.Watts()) + assertMainMetricValue(t, mf, "dram", nodeDramPower.Watts()) case "kepler_node_cpu_active_watts": // Active watts metric - no mode label - assertMainMetricValue(t, mf, "package-0", (nodePkgPower / 2).Watts()) - assertMainMetricValue(t, mf, "dram-0", (nodeDramPower / 2).Watts()) + assertMainMetricValue(t, mf, "package", (nodePkgPower / 2).Watts()) + assertMainMetricValue(t, mf, "dram", (nodeDramPower / 2).Watts()) case "kepler_node_cpu_idle_watts": // Idle watts metric - no mode label - assertMainMetricValue(t, mf, "package-0", (nodePkgPower / 2).Watts()) - assertMainMetricValue(t, mf, "dram-0", (nodeDramPower / 2).Watts()) + assertMainMetricValue(t, mf, "package", (nodePkgPower / 2).Watts()) + assertMainMetricValue(t, mf, "dram", (nodeDramPower / 2).Watts()) case "kepler_node_cpu_active_joules_total": // Active joules metric - no mode label - assertMainMetricValue(t, mf, "package-0", (nodePkgDelta / 2).Joules()) - assertMainMetricValue(t, mf, "dram-0", (nodeDramDelta / 2).Joules()) + assertMainMetricValue(t, mf, "package", (nodePkgDelta / 2).Joules()) + assertMainMetricValue(t, mf, "dram", (nodeDramDelta / 2).Joules()) case "kepler_node_cpu_idle_joules_total": // Idle joules metric - no mode label - assertMainMetricValue(t, mf, "package-0", (nodePkgDelta / 2).Joules()) - assertMainMetricValue(t, mf, "dram-0", (nodeDramDelta / 2).Joules()) + assertMainMetricValue(t, mf, "package", (nodePkgDelta / 2).Joules()) + assertMainMetricValue(t, mf, "dram", (nodeDramDelta / 2).Joules()) case "kepler_node_cpu_usage_ratio": // Usage ratio metric diff --git a/internal/exporter/prometheus/collector/power_collector_test.go b/internal/exporter/prometheus/collector/power_collector_test.go index 422a9566b8..11f2a5b514 100644 --- a/internal/exporter/prometheus/collector/power_collector_test.go +++ b/internal/exporter/prometheus/collector/power_collector_test.go @@ -427,7 +427,7 @@ func TestPowerCollector(t *testing.T) { zonePaths = append(zonePaths, path) } - assert.ElementsMatch(t, zoneNames, []string{"package-0", "dram-0"}) + assert.ElementsMatch(t, zoneNames, []string{"package", "dram"}) assert.ElementsMatch(t, zonePaths, []string{ "/sys/class/powercap/intel-rapl/intel-rapl:0", "/sys/class/powercap/intel-rapl/intel-rapl:0:1", @@ -441,7 +441,7 @@ func TestPowerCollector(t *testing.T) { "comm": "test-process", "exe": "/usr/bin/123", "type": "regular", - "zone": "package-0", + "zone": "package", } assertMetricLabelValues(t, registry, "kepler_process_cpu_joules_total", expectedLabels, 100.0) assertMetricLabelValues(t, registry, "kepler_process_cpu_watts", expectedLabels, 5.0) @@ -453,7 +453,7 @@ func TestPowerCollector(t *testing.T) { "container_id": "abcd-efgh", "container_name": "test-container", "runtime": "podman", - "zone": "package-0", + "zone": "package", } assertMetricLabelValues(t, registry, "kepler_container_cpu_joules_total", expectedLabels, 100.0) assertMetricLabelValues(t, registry, "kepler_container_cpu_watts", expectedLabels, 5.0) @@ -465,7 +465,7 @@ func TestPowerCollector(t *testing.T) { "vm_id": "abcd-efgh", "vm_name": "test-vm", "hypervisor": "kvm", - "zone": "package-0", + "zone": "package", } assertMetricLabelValues(t, registry, "kepler_vm_cpu_joules_total", expectedLabels, 100.0) assertMetricLabelValues(t, registry, "kepler_vm_cpu_watts", expectedLabels, 5.0) @@ -477,7 +477,7 @@ func TestPowerCollector(t *testing.T) { "pod_id": "test-pod", "pod_name": "test-pod", "pod_namespace": "default", - "zone": "package-0", + "zone": "package", } assertMetricLabelValues(t, registry, "kepler_pod_cpu_joules_total", expectedLabels, 100.0) assertMetricLabelValues(t, registry, "kepler_pod_cpu_watts", expectedLabels, 5.0)