1 /* Copyright 2022 The TensorFlow Authors. All Rights Reserved.
2
3 Licensed under the Apache License, Version 2.0 (the "License");
4 you may not use this file except in compliance with the License.
5 You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9 Unless required by applicable law or agreed to in writing, software
10 distributed under the License is distributed on an "AS IS" BASIS,
11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 See the License for the specific language governing permissions and
13 limitations under the License.
14 ==============================================================================*/
15 #include "tensorflow/core/data/metric_utils.h"
16
17 #include <algorithm>
18 #include <cstdint>
19 #include <string>
20 #include <vector>
21
22 #include "absl/time/time.h"
23 #include "tensorflow/core/data/utils.h"
24 #include "tensorflow/core/framework/dataset.h"
25 #include "tensorflow/core/framework/metrics.h"
26 #include "tensorflow/core/framework/tensor.h"
27 #include "tensorflow/core/framework/types.h"
28 #include "tensorflow/core/platform/env.h"
29 #include "tensorflow/core/platform/mutex.h"
30
31 namespace tensorflow {
32 namespace data {
33 namespace {
34
35 // Safely subtracts `x` from `y` avoiding underflow.
safe_sub(uint64_t x,uint64_t y)36 uint64_t safe_sub(uint64_t x, uint64_t y) { return x >= y ? x - y : 0; }
37
38 } // namespace
39
IteratorMetricsCollector(const std::string & device_type,const Env & env)40 IteratorMetricsCollector::IteratorMetricsCollector(
41 const std::string& device_type, const Env& env)
42 : device_type_(device_type), env_(env) {}
43
RecordStart()44 absl::Time IteratorMetricsCollector::RecordStart() {
45 const uint64_t start_time_us = env_.NowMicros();
46 if (!ShouldCollectMetrics()) {
47 return absl::FromUnixMicros(start_time_us);
48 }
49
50 mutex_lock l(mu_);
51 if (end_time_us_ == 0) {
52 // We initialize `end_time_us_` to the start time of the first request to
53 // make it possible to use the delta between `end_time_us_` and subsequent
54 // `GetNext()` end time to incrementally collect the duration of the
55 // iterator's lifetime.
56 end_time_us_ = start_time_us;
57 }
58 uint64_t gap_time_us = 0;
59 if (num_active_calls_ == 0) {
60 first_start_time_us_ = start_time_us;
61 gap_time_us = safe_sub(start_time_us, end_time_us_);
62 }
63 metrics::RecordTFDataIteratorGap(gap_time_us);
64 num_active_calls_++;
65 return absl::FromUnixMicros(start_time_us);
66 }
67
RecordStop(absl::Time start_time,const std::vector<Tensor> & output)68 void IteratorMetricsCollector::RecordStop(absl::Time start_time,
69 const std::vector<Tensor>& output) {
70 if (!ShouldCollectMetrics()) {
71 return;
72 }
73
74 const uint64_t end_time_us = env_.NowMicros();
75 AddLatencySample(safe_sub(end_time_us, absl::ToUnixMicros(start_time)));
76 IncrementThroughput(GetTotalBytes(output));
77 mutex_lock l(mu_);
78 metrics::RecordTFDataIteratorLifetime(safe_sub(end_time_us, end_time_us_));
79 end_time_us_ = std::max(end_time_us_, end_time_us);
80 num_active_calls_--;
81 if (num_active_calls_ == 0) {
82 metrics::RecordTFDataIteratorBusy(
83 safe_sub(end_time_us_, first_start_time_us_));
84 }
85 }
86
ShouldCollectMetrics() const87 bool IteratorMetricsCollector::ShouldCollectMetrics() const {
88 return device_type_ == DEVICE_CPU;
89 }
90
91 } // namespace data
92 } // namespace tensorflow
93