1import _overlapped 2import _thread 3import _winapi 4import math 5import struct 6import winreg 7 8 9# Seconds per measurement 10SAMPLING_INTERVAL = 1 11# Exponential damping factor to compute exponentially weighted moving average 12# on 1 minute (60 seconds) 13LOAD_FACTOR_1 = 1 / math.exp(SAMPLING_INTERVAL / 60) 14# Initialize the load using the arithmetic mean of the first NVALUE values 15# of the Processor Queue Length 16NVALUE = 5 17 18 19class WindowsLoadTracker(): 20 """ 21 This class asynchronously reads the performance counters to calculate 22 the system load on Windows. A "raw" thread is used here to prevent 23 interference with the test suite's cases for the threading module. 24 """ 25 26 def __init__(self): 27 # Pre-flight test for access to the performance data; 28 # `PermissionError` will be raised if not allowed 29 winreg.QueryInfoKey(winreg.HKEY_PERFORMANCE_DATA) 30 31 self._values = [] 32 self._load = None 33 self._running = _overlapped.CreateEvent(None, True, False, None) 34 self._stopped = _overlapped.CreateEvent(None, True, False, None) 35 36 _thread.start_new_thread(self._update_load, (), {}) 37 38 def _update_load(self, 39 # localize module access to prevent shutdown errors 40 _wait=_winapi.WaitForSingleObject, 41 _signal=_overlapped.SetEvent): 42 # run until signaled to stop 43 while _wait(self._running, 1000): 44 self._calculate_load() 45 # notify stopped 46 _signal(self._stopped) 47 48 def _calculate_load(self, 49 # localize module access to prevent shutdown errors 50 _query=winreg.QueryValueEx, 51 _hkey=winreg.HKEY_PERFORMANCE_DATA, 52 _unpack=struct.unpack_from): 53 # get the 'System' object 54 data, _ = _query(_hkey, '2') 55 # PERF_DATA_BLOCK { 56 # WCHAR Signature[4] 8 + 57 # DWOWD LittleEndian 4 + 58 # DWORD Version 4 + 59 # DWORD Revision 4 + 60 # DWORD TotalByteLength 4 + 61 # DWORD HeaderLength = 24 byte offset 62 # ... 63 # } 64 obj_start, = _unpack('L', data, 24) 65 # PERF_OBJECT_TYPE { 66 # DWORD TotalByteLength 67 # DWORD DefinitionLength 68 # DWORD HeaderLength 69 # ... 70 # } 71 data_start, defn_start = _unpack('4xLL', data, obj_start) 72 data_base = obj_start + data_start 73 defn_base = obj_start + defn_start 74 # find the 'Processor Queue Length' counter (index=44) 75 while defn_base < data_base: 76 # PERF_COUNTER_DEFINITION { 77 # DWORD ByteLength 78 # DWORD CounterNameTitleIndex 79 # ... [7 DWORDs/28 bytes] 80 # DWORD CounterOffset 81 # } 82 size, idx, offset = _unpack('LL28xL', data, defn_base) 83 defn_base += size 84 if idx == 44: 85 counter_offset = data_base + offset 86 # the counter is known to be PERF_COUNTER_RAWCOUNT (DWORD) 87 processor_queue_length, = _unpack('L', data, counter_offset) 88 break 89 else: 90 return 91 92 # We use an exponentially weighted moving average, imitating the 93 # load calculation on Unix systems. 94 # https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation 95 # https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average 96 if self._load is not None: 97 self._load = (self._load * LOAD_FACTOR_1 98 + processor_queue_length * (1.0 - LOAD_FACTOR_1)) 99 elif len(self._values) < NVALUE: 100 self._values.append(processor_queue_length) 101 else: 102 self._load = sum(self._values) / len(self._values) 103 104 def close(self, kill=True): 105 self.__del__() 106 return 107 108 def __del__(self, 109 # localize module access to prevent shutdown errors 110 _wait=_winapi.WaitForSingleObject, 111 _close=_winapi.CloseHandle, 112 _signal=_overlapped.SetEvent): 113 if self._running is not None: 114 # tell the update thread to quit 115 _signal(self._running) 116 # wait for the update thread to signal done 117 _wait(self._stopped, -1) 118 # cleanup events 119 _close(self._running) 120 _close(self._stopped) 121 self._running = self._stopped = None 122 123 def getloadavg(self): 124 return self._load 125