1# Copyright © 2020 Arm Ltd. All rights reserved. 2# SPDX-License-Identifier: MIT 3import os 4 5import pytest 6import warnings 7import numpy as np 8 9import pyarmnn as ann 10 11 12@pytest.fixture(scope="function") 13def random_runtime(shared_data_folder): 14 parser = ann.ITfLiteParser() 15 network = parser.CreateNetworkFromBinaryFile(os.path.join(shared_data_folder, 'mock_model.tflite')) 16 preferred_backends = [ann.BackendId('CpuRef')] 17 options = ann.CreationOptions() 18 19 runtime = ann.IRuntime(options) 20 21 graphs_count = parser.GetSubgraphCount() 22 23 graph_id = graphs_count - 1 24 input_names = parser.GetSubgraphInputTensorNames(graph_id) 25 26 input_binding_info = parser.GetNetworkInputBindingInfo(graph_id, input_names[0]) 27 input_tensor_id = input_binding_info[0] 28 29 input_tensor_info = input_binding_info[1] 30 input_tensor_info.SetConstant() 31 32 output_names = parser.GetSubgraphOutputTensorNames(graph_id) 33 34 input_data = np.random.randint(255, size=input_tensor_info.GetNumElements(), dtype=np.uint8) 35 36 const_tensor_pair = (input_tensor_id, ann.ConstTensor(input_tensor_info, input_data)) 37 38 input_tensors = [const_tensor_pair] 39 40 output_tensors = [] 41 42 for index, output_name in enumerate(output_names): 43 out_bind_info = parser.GetNetworkOutputBindingInfo(graph_id, output_name) 44 45 out_tensor_info = out_bind_info[1] 46 out_tensor_id = out_bind_info[0] 47 48 output_tensors.append((out_tensor_id, 49 ann.Tensor(out_tensor_info))) 50 51 yield preferred_backends, network, runtime, input_tensors, output_tensors 52 53 54@pytest.fixture(scope='function') 55def mock_model_runtime(shared_data_folder): 56 parser = ann.ITfLiteParser() 57 network = parser.CreateNetworkFromBinaryFile(os.path.join(shared_data_folder, 'mock_model.tflite')) 58 graph_id = 0 59 60 input_binding_info = parser.GetNetworkInputBindingInfo(graph_id, "input_1") 61 62 input_tensor_data = np.load(os.path.join(shared_data_folder, 'tflite_parser/input_lite.npy')) 63 64 preferred_backends = [ann.BackendId('CpuRef')] 65 66 options = ann.CreationOptions() 67 runtime = ann.IRuntime(options) 68 69 opt_network, messages = ann.Optimize(network, preferred_backends, runtime.GetDeviceSpec(), ann.OptimizerOptions()) 70 71 print(messages) 72 73 net_id, messages = runtime.LoadNetwork(opt_network) 74 75 print(messages) 76 77 input_tensors = ann.make_input_tensors([input_binding_info], [input_tensor_data]) 78 79 output_names = parser.GetSubgraphOutputTensorNames(graph_id) 80 outputs_binding_info = [] 81 82 for output_name in output_names: 83 outputs_binding_info.append(parser.GetNetworkOutputBindingInfo(graph_id, output_name)) 84 85 output_tensors = ann.make_output_tensors(outputs_binding_info) 86 87 yield runtime, net_id, input_tensors, output_tensors 88 89 90def test_python_disowns_network(random_runtime): 91 preferred_backends = random_runtime[0] 92 network = random_runtime[1] 93 runtime = random_runtime[2] 94 opt_network, _ = ann.Optimize(network, preferred_backends, 95 runtime.GetDeviceSpec(), ann.OptimizerOptions()) 96 97 runtime.LoadNetwork(opt_network) 98 99 assert not opt_network.thisown 100 101 102def test_load_network(random_runtime): 103 preferred_backends = random_runtime[0] 104 network = random_runtime[1] 105 runtime = random_runtime[2] 106 107 opt_network, _ = ann.Optimize(network, preferred_backends, 108 runtime.GetDeviceSpec(), ann.OptimizerOptions()) 109 110 net_id, messages = runtime.LoadNetwork(opt_network) 111 assert "" == messages 112 assert net_id == 0 113 114 115def test_create_runtime_with_external_profiling_enabled(): 116 117 options = ann.CreationOptions() 118 119 options.m_ProfilingOptions.m_FileOnly = True 120 options.m_ProfilingOptions.m_EnableProfiling = True 121 options.m_ProfilingOptions.m_OutgoingCaptureFile = "/tmp/outgoing.txt" 122 options.m_ProfilingOptions.m_IncomingCaptureFile = "/tmp/incoming.txt" 123 options.m_ProfilingOptions.m_TimelineEnabled = True 124 options.m_ProfilingOptions.m_CapturePeriod = 1000 125 options.m_ProfilingOptions.m_FileFormat = "JSON" 126 127 runtime = ann.IRuntime(options) 128 129 assert runtime is not None 130 131 132def test_create_runtime_with_external_profiling_enabled_invalid_options(): 133 134 options = ann.CreationOptions() 135 136 options.m_ProfilingOptions.m_FileOnly = True 137 options.m_ProfilingOptions.m_EnableProfiling = False 138 options.m_ProfilingOptions.m_OutgoingCaptureFile = "/tmp/outgoing.txt" 139 options.m_ProfilingOptions.m_IncomingCaptureFile = "/tmp/incoming.txt" 140 options.m_ProfilingOptions.m_TimelineEnabled = True 141 options.m_ProfilingOptions.m_CapturePeriod = 1000 142 options.m_ProfilingOptions.m_FileFormat = "JSON" 143 144 with pytest.raises(RuntimeError) as err: 145 runtime = ann.IRuntime(options) 146 147 expected_error_message = "It is not possible to enable timeline reporting without profiling being enabled" 148 assert expected_error_message in str(err.value) 149 150 151def test_load_network_properties_provided(random_runtime): 152 preferred_backends = random_runtime[0] 153 network = random_runtime[1] 154 runtime = random_runtime[2] 155 156 opt_network, _ = ann.Optimize(network, preferred_backends, 157 runtime.GetDeviceSpec(), ann.OptimizerOptions()) 158 159 inputSource = ann.MemorySource_Undefined 160 outputSource = ann.MemorySource_Undefined 161 properties = ann.INetworkProperties(False, inputSource, outputSource) 162 net_id, messages = runtime.LoadNetwork(opt_network, properties) 163 assert "" == messages 164 assert net_id == 0 165 166 167def test_network_properties_constructor(random_runtime): 168 preferred_backends = random_runtime[0] 169 network = random_runtime[1] 170 runtime = random_runtime[2] 171 172 opt_network, _ = ann.Optimize(network, preferred_backends, 173 runtime.GetDeviceSpec(), ann.OptimizerOptions()) 174 175 inputSource = ann.MemorySource_Undefined 176 outputSource = ann.MemorySource_Undefined 177 properties = ann.INetworkProperties(True, inputSource, outputSource) 178 assert properties.m_AsyncEnabled == True 179 assert properties.m_ProfilingEnabled == False 180 assert properties.m_OutputNetworkDetailsMethod == ann.ProfilingDetailsMethod_Undefined 181 assert properties.m_InputSource == ann.MemorySource_Undefined 182 assert properties.m_OutputSource == ann.MemorySource_Undefined 183 184 net_id, messages = runtime.LoadNetwork(opt_network, properties) 185 assert "" == messages 186 assert net_id == 0 187 188 189def test_unload_network_fails_for_invalid_net_id(random_runtime): 190 preferred_backends = random_runtime[0] 191 network = random_runtime[1] 192 runtime = random_runtime[2] 193 194 ann.Optimize(network, preferred_backends, runtime.GetDeviceSpec(), ann.OptimizerOptions()) 195 196 with pytest.raises(RuntimeError) as err: 197 runtime.UnloadNetwork(9) 198 199 expected_error_message = "Failed to unload network." 200 assert expected_error_message in str(err.value) 201 202 203def test_enqueue_workload(random_runtime): 204 preferred_backends = random_runtime[0] 205 network = random_runtime[1] 206 runtime = random_runtime[2] 207 input_tensors = random_runtime[3] 208 output_tensors = random_runtime[4] 209 210 opt_network, _ = ann.Optimize(network, preferred_backends, 211 runtime.GetDeviceSpec(), ann.OptimizerOptions()) 212 213 net_id, _ = runtime.LoadNetwork(opt_network) 214 runtime.EnqueueWorkload(net_id, input_tensors, output_tensors) 215 216 217def test_enqueue_workload_fails_with_empty_input_tensors(random_runtime): 218 preferred_backends = random_runtime[0] 219 network = random_runtime[1] 220 runtime = random_runtime[2] 221 input_tensors = [] 222 output_tensors = random_runtime[4] 223 224 opt_network, _ = ann.Optimize(network, preferred_backends, 225 runtime.GetDeviceSpec(), ann.OptimizerOptions()) 226 227 net_id, _ = runtime.LoadNetwork(opt_network) 228 with pytest.raises(RuntimeError) as err: 229 runtime.EnqueueWorkload(net_id, input_tensors, output_tensors) 230 231 expected_error_message = "Number of inputs provided does not match network." 232 assert expected_error_message in str(err.value) 233 234 235@pytest.mark.x86_64 236@pytest.mark.parametrize('count', [5]) 237def test_multiple_inference_runs_yield_same_result(count, mock_model_runtime): 238 """ 239 Test that results remain consistent among multiple runs of the same inference. 240 """ 241 runtime = mock_model_runtime[0] 242 net_id = mock_model_runtime[1] 243 input_tensors = mock_model_runtime[2] 244 output_tensors = mock_model_runtime[3] 245 246 expected_results = np.array([[4, 85, 108, 29, 8, 16, 0, 2, 5, 0]]) 247 248 for _ in range(count): 249 runtime.EnqueueWorkload(net_id, input_tensors, output_tensors) 250 251 output_vectors = ann.workload_tensors_to_ndarray(output_tensors) 252 253 for i in range(len(expected_results)): 254 assert output_vectors[i].all() == expected_results[i].all() 255 256 257@pytest.mark.aarch64 258def test_aarch64_inference_results(mock_model_runtime): 259 260 runtime = mock_model_runtime[0] 261 net_id = mock_model_runtime[1] 262 input_tensors = mock_model_runtime[2] 263 output_tensors = mock_model_runtime[3] 264 265 runtime.EnqueueWorkload(net_id, input_tensors, output_tensors) 266 267 output_vectors = ann.workload_tensors_to_ndarray(output_tensors) 268 269 expected_outputs = expected_results = np.array([[4, 85, 108, 29, 8, 16, 0, 2, 5, 0]]) 270 271 for i in range(len(expected_outputs)): 272 assert output_vectors[i].all() == expected_results[i].all() 273 274 275def test_enqueue_workload_with_profiler(random_runtime): 276 """ 277 Tests ArmNN's profiling extension 278 """ 279 preferred_backends = random_runtime[0] 280 network = random_runtime[1] 281 runtime = random_runtime[2] 282 input_tensors = random_runtime[3] 283 output_tensors = random_runtime[4] 284 285 opt_network, _ = ann.Optimize(network, preferred_backends, 286 runtime.GetDeviceSpec(), ann.OptimizerOptions()) 287 net_id, _ = runtime.LoadNetwork(opt_network) 288 289 profiler = runtime.GetProfiler(net_id) 290 # By default profiling should be turned off: 291 assert profiler.IsProfilingEnabled() is False 292 293 # Enable profiling: 294 profiler.EnableProfiling(True) 295 assert profiler.IsProfilingEnabled() is True 296 297 # Run the inference: 298 runtime.EnqueueWorkload(net_id, input_tensors, output_tensors) 299 300 # Get profile output as a string: 301 str_profile = profiler.as_json() 302 303 # Verify that certain markers are present: 304 assert len(str_profile) != 0 305 assert str_profile.find('\"ArmNN\": {') > 0 306 307 # Get events analysis output as a string: 308 str_events_analysis = profiler.event_log() 309 310 assert "Event Sequence - Name | Duration (ms) | Start (ms) | Stop (ms) | Device" in str_events_analysis 311 312 assert profiler.thisown == 0 313 314 315def test_check_runtime_swig_ownership(random_runtime): 316 # Check to see that SWIG has ownership for runtime. This instructs SWIG to take 317 # ownership of the return value. This allows the value to be automatically 318 # garbage-collected when it is no longer in use 319 runtime = random_runtime[2] 320 assert runtime.thisown 321