xref: /aosp_15_r20/external/rappor/pipeline/combine_status_test.py (revision 2abb31345f6c95944768b5222a9a5ed3fc68cc00)
1#!/usr/bin/python -S
2"""
3combine_status_test.py: Tests for combine_status.py
4"""
5
6import csv
7import cStringIO
8import unittest
9
10import combine_status  # module under test
11
12
13# TODO: Make these test more the header row.  They rely heavily on the file
14# system!
15
16class CombineStatusTest(unittest.TestCase):
17
18  def testCombineDistTaskStatus(self):
19    stdin = cStringIO.StringIO('')
20    out = cStringIO.StringIO()
21    c_out = csv.writer(out)
22
23    combine_status.CombineDistTaskStatus(stdin, c_out, {})
24    actual = out.getvalue()
25    self.assert_(actual.startswith('job_id,params_file,'), actual)
26
27  def testCombineAssocTaskStatus(self):
28    stdin = cStringIO.StringIO('')
29    out = cStringIO.StringIO()
30    c_out = csv.writer(out)
31
32    combine_status.CombineAssocTaskStatus(stdin, c_out)
33    actual = out.getvalue()
34    self.assert_(actual.startswith('job_id,metric,'), actual)
35
36
37if __name__ == '__main__':
38  unittest.main()
39