xref: /aosp_15_r20/external/autotest/client/site_tests/platform_PrinterPpds/helpers.py (revision 9c5db1993ded3edbeafc8092d69fe5de2ee02df7)
1# Copyright 2018 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import json
6import hashlib
7import os
8import requests
9
10# ==================== Documents digests
11
12def _read_lines_with_prefix(document, position, prefix):
13    """
14    Starting from given position, it parses from the document complete lines
15    (with '\n' character at the end) starting from given prefix. Parser stops
16    on the first line that does not start from the given prefix or when there
17    are no more '\n' characters in the file.
18
19    @param document: a document to parse
20    @param position: an offset in the document to start from
21
22    @returns a pair (lines, position), where the first element is a list of
23        parsed lines (with '\n' character at the end) and the second element
24        is a new offset in the document, pointing at the first character after
25        the last parsed line
26
27    """
28    lines = []
29    while document.startswith(prefix, position):
30        position_next_line = document.find(b'\n', position + len(prefix))
31        if position_next_line < 0:
32            break
33        position_next_line += 1  # to eat '\n' character
34        lines.append(document[position:position_next_line])
35        position = position_next_line
36    return lines, position
37
38
39def _process_PJL_headers(doc, position, out):
40    """
41    The function tries to find a PJL headers in given document and process
42    them as it was described in _normalize_document(doc) function.
43
44    @param doc: see the description of _normalize_document(doc)
45    @param position: offset in the document; defines part of the document that
46            is already processed; searching for headers starts from this
47            position
48    @param out: already processed part of the document (from the beginning to
49            the given position)
50
51    @returns new position and output; the position is set at the end of the last
52            processed PJL header or it is a copy of of input position, if no PJL
53            headers have been foound; the output is adjusted accordingly.
54
55    """
56    PJL_MARKER = b'\x1B%-12345X'
57    MARGIN = 2048  # max distance to the header
58    position_pjl = doc.find(PJL_MARKER, position, position + MARGIN)
59    while position_pjl >= 0:
60        out += doc[position:(position_pjl+len(PJL_MARKER))]
61        position = position_pjl + len(PJL_MARKER)
62        # parse header and filter problematic lines
63        lines, position = _read_lines_with_prefix(doc, position, b'@PJL')
64        for line in lines:
65            if not (line.startswith(b'@PJL SET ')
66                    or line.startswith(b'@PJL COMMENT')
67                    or line.startswith(b'@PJL DMINFO')
68                    or line.startswith(b'@PJL JOB NAME')
69                    or line.startswith(b'@PJL JOBNAME')):
70                out += line
71        # try to find next PJL header
72        position_pjl = doc.find(PJL_MARKER, position, position + MARGIN)
73    return position, out
74
75
76def _process_PS_Adobe_headers(doc, position, out):
77    """
78    The function tries to find a PS-Adobe headers in given document and process
79    them as it was described in _normalize_document(doc) function.
80
81    @param doc: see the description of _normalize_document(doc)
82    @param position: offset in the document; defines part of the document that
83            is already processed; searching for headers starts from this
84            position
85    @param out: already processed part of the document (from the beginning to
86            the given position)
87
88    @returns new position and output; the position is set at the end of the last
89            processed PS-Adobe header or it is a copy of of input position, if
90            no PS-Adobe headers have been foound; the output is adjusted
91            accordingly.
92
93    """
94    PS_MARKER = b'%!PS-Adobe'
95    MARGIN = 2048  # max distance to the header
96    position_ps = doc.find(PS_MARKER, position, position + MARGIN)
97    while position_ps >= 0:
98        # add everything till the end of the first line in the header
99        position_next_line = doc.find(b'\n', position_ps + len(PS_MARKER))
100        if position_next_line < 0:
101            break  # no more '\n', we finish the parsing here
102        position_next_line += 1 # to eat \n character
103        out += doc[position:position_next_line]
104        # parse the rest of the header and filter problematic lines
105        lines, position = _read_lines_with_prefix(doc, position_next_line,
106                                                  b'%')
107        for line in lines:
108            if not (line.startswith(b'%%Title:')
109                    or line.startswith(b'%%For:')):
110                out += line
111        # search for lines with '{setuserinfo}' or '/JobInfo <<'
112        position_ps = doc.find(PS_MARKER, position, position + MARGIN)
113        position_ui = doc.find(b'{setuserinfo}', position, position + MARGIN)
114        position_ji = doc.find(b'/JobInfo <<', position, position + MARGIN)
115        # if '/JobInfo <<' was found, move the offset to the end of the section
116        if position_ji >= 0:
117            position_ji = doc.find(b'>>', position_ji)
118        # if the beginning of the next header was found, make sure that
119        # detected sections do not belong to the next header
120        if position_ps >= 0:
121            if position_ji > position_ps:
122                position_ji = -1
123            if position_ui > position_ps:
124                position_ui = -1
125        # choose the farthest section
126        position_end = max(position_ji, position_ui)
127        if position_end >= 0:
128            # find the first '\n' after the farthest section
129            position_end = doc.find(b'\n', position_end)
130            if position_end < 0:
131                break  # no more '\n', we finish the parsing here
132            # split into lines everything from here to the end of the section
133            lines = doc[position:position_end].split(b'\n')
134            position = position_end + 1  # +1 is needed to eat the last \n
135            # filter problematic lines
136            for line in lines:
137                if not (line.find(b'{setuserinfo}') >= 0 or
138                        line.find(b'/UserID') >= 0 or line.find(b'/Time') >= 0
139                        or line.find(b'/HostLoginName') >= 0
140                        or line.find(b'/HostName') >= 0):
141                    out += line + b'\n'
142            # go to the next iteration, position_ps is already set
143    return position, out
144
145
146def _normalize_LIDIL(doc):
147    """
148    The function tries to proces given document as it was described in
149    _normalize_document(doc) function, but assuming that the document is in
150    LIDIL format. This format is used by some HP printers.
151
152    @param doc: see the description of _normalize_document(doc)
153
154    @returns None if the give ndocument is not in LIDIL format. Otherwise, it
155        returns a result for _normalize_document(doc) function.
156
157    """
158    LIDIL_MARKER = b'\x24\x01\x00\x00\x07\x00\x00\x00'
159    LIDIL_JOBID_1_OFF = 2348 # first job id, offset from the beginning
160    LIDIL_JOBID_2_OFF = 2339 # second job id, offset from the end
161    JOBID_SIZE = 4 # number of bytes used to store job id
162    # the document is in LIDIL format <=> it starts with the marker
163    if not doc.startswith(LIDIL_MARKER):
164        return None
165    # remove both JOB IDs and exit
166    nd = len(doc)
167    if nd > LIDIL_JOBID_1_OFF + LIDIL_JOBID_2_OFF + 2*JOBID_SIZE:
168        doc = b''.join([
169                doc[:(LIDIL_JOBID_1_OFF)],
170                doc[(LIDIL_JOBID_1_OFF + JOBID_SIZE):(nd - LIDIL_JOBID_2_OFF)],
171                doc[(nd - LIDIL_JOBID_2_OFF + JOBID_SIZE):]
172        ])
173    return doc
174
175
176def _normalize_EJL(doc):
177    """
178    The function tries to proces given document as it was described in
179    _normalize_document(doc) function, but assuming that the document is in
180    EJL format.
181
182    @param doc: see the description of _normalize_document(doc)
183
184    @returns None if the give ndocument is not in EJL format. Otherwise, it
185        returns a result for _normalize_document(doc) function.
186
187    """
188    # EJL - some epson printers (like eplaser)
189    EJL_MARKER = b'\x1B\x01@EJL \n'
190    # the document is in EJL format <=> it starts with the marker
191    if not doc.startswith(EJL_MARKER):
192        return None
193    # copy the document to output; filter lines parsed from the EJL header
194    out = EJL_MARKER
195    lines, position = _read_lines_with_prefix(doc, len(EJL_MARKER), b'@EJL')
196    for line in lines:
197        if not (line.startswith(b'@EJL JI ID=')
198                or line.startswith(b'@EJL JI USER=')):
199            out += line
200    # add the rest of the document and exit
201    out += doc[position:]
202    return out
203
204
205def _normalize_document(doc):
206    """
207    The input document is a raw package sent to printer. This function removes
208    from it all variables that can change, when the same content is printed.
209    That includes, but is not limited to: user name, host name, job id, date,
210    time.
211
212    @param doc: a raw document sent directly to printer to be printed (bytes)
213
214    @returns a copy of doc (bytes) with removed fragments that can vary between
215        printing jobs. The returned output is supposed to be identical for the
216        same input content send to the pipeline for the same PPD file.
217
218    """
219    # Try to parse the document as LIDIL or EJL and exit if successful.
220    out = _normalize_LIDIL(doc)
221    if out is not None:
222        return out
223    out = _normalize_EJL(doc)
224    if out is not None:
225        return out
226
227    # Try to parse and process PJL and PS headers.
228    position = 0
229    out = b''
230    position, out = _process_PJL_headers(doc, position, out)
231    position, out = _process_PS_Adobe_headers(doc, position, out)
232
233    # Go to the tail of the document, add the skipped content to the output.
234    if position + 2048 < len(doc):
235        position_tail = len(doc) - 2048
236        out += doc[position:position_tail]
237        position = position_tail
238
239    # Try to find 'trailer << '.
240    position_trailer = doc.find(b'trailer << ', position)
241    if position_trailer >= 0:
242        # If found, prune the line with it.
243        position_end = doc.find(b'\n', position_trailer)
244        if position_end >= 0:
245            out += doc[position:position_trailer]
246            position = position_end + 1  # +1 to ommit '\n' from the trailer
247
248    # Add the rest of the document to the output.
249    out += doc[position:]
250
251    return out
252
253
254def calculate_digest(doc):
255    """
256    Calculates digests for given document.
257
258    @param doc: document's content (bytes)
259
260    @returns calculated digests as a string of hexadecimals
261
262    """
263    # Prune the variable parts of the document
264    out = _normalize_document(doc)
265
266    # Calculates hash
267    return hashlib.md5(out).hexdigest()
268
269
270def parse_digests_file(path_digests, denylist):
271    """
272    Parses digests and outputs sizes from file.
273
274    @param path_digests: a path to a file with digests
275    @param denylist: list of keys to omit
276
277    @returns two dictionaries, both indexed by ppd filenames: the first one
278            contains digests, the second one contains output sizes; returns
279            empty dictionaries if the given file does not exist
280
281    """
282    digests = dict()
283    sizes = dict()
284    denylist = set(denylist)
285    if os.path.isfile(path_digests):
286        with open(path_digests, 'r') as file_digests:
287            lines = file_digests.read().splitlines()
288            for line in lines:
289                cols = line.split()
290                if len(cols) >= 2 and cols[0] not in denylist:
291                    digests[cols[0]] = cols[1]
292                    if len(cols) > 2 and len(cols[2]) > 0:
293                        sizes[cols[0]] = int(cols[2])
294    return digests, sizes
295
296
297def save_digests_file(path_digests, digests, sizes, denylist):
298    """
299    Saves list of digests and output sizes to file.
300
301    @param digests: dictionary with digests (keys are names)
302    @param sizes: dictionary with outputs sizes (keys are names)
303    @param denylist: list of keys to ignore
304
305    @return a content of digests file
306
307    """
308    digests_content = ''
309    names = sorted(set(digests.keys()).difference(denylist))
310    for name in names:
311        digest = digests[name]
312        assert name.find('\t') < 0 and name.find('\n') < 0
313        assert digest.find('\t') < 0 and digest.find('\n') < 0
314        digests_content += name + '\t' + digest
315        if name in sizes:
316            assert isinstance(sizes[name], int)
317            digests_content += '\t' + str(sizes[name])
318        digests_content += '\n'
319
320    with open(path_digests, 'wb') as file_digests:
321        file_digests.write(digests_content.encode("utf-8"))
322
323
324def load_lines_from_file(path):
325    """
326    Loads strings stored in the given file as separated lines.
327
328    This routine returns lines read from the given file. All leading and
329    trailing whitespace characters in each line are removed. Lines consisting of
330    whitespace characters only are skipped.
331
332    @param path: a path to the input file
333
334    @returns a list of non-empty strings
335
336    """
337    with open(path) as input_file:
338        lines = input_file.readlines()
339
340    output_list = []
341    for entry in lines:
342        entry = entry.strip()
343        if entry != '':
344            output_list.append(entry)
345
346    return output_list
347
348
349# ===================== PPD files on the SCS server
350
351def get_filenames_from_PPD_index(task_id):
352    """
353    It downloads an index file from the SCS server and extracts names
354    of PPD files from it.
355
356    @param task_id: an order number of an index file to process; this is
357            an integer from the interval [0..20)
358
359    @returns a list of PPD filenames (may contain duplicates)
360
361    """
362    # calculates a URL of the index file
363    url_metadata = 'https://www.gstatic.com/chromeos_printing/metadata_v2/'
364    url_ppd_index = url_metadata + ('index-%02d.json' % task_id)
365    # donwloads and parses the index file
366    request = requests.get(url_ppd_index)
367    entries = json.loads(request.content)
368    # extracts PPD filenames (the second element in each index entry)
369    output = []
370    for entry in entries:
371        output.append(entry[1])
372    # returns a list of extracted filenames
373    return output
374
375
376def download_PPD_file(ppd_file):
377    """
378    It downloads a PPD file from the SCS server.
379
380    @param ppd_file: a filename of PPD file (neither path nor URL)
381
382    @returns content of the PPD file
383    """
384    url_ppds = 'https://www.gstatic.com/chromeos_printing/ppds/'
385    request = requests.get(url_ppds + ppd_file)
386    return request.content
387
388
389# ==================== Local filesystem
390
391def list_entries_from_directory(
392        path,
393        with_suffixes=None, nonempty_results=False,
394        include_files=True, include_directories=True ):
395    """
396    It returns all filenames from given directory. Results may be filtered
397    by filenames suffixes or entries types.
398
399    @param path: a path to directory to list files from
400    @param with_suffixes: if set, only entries with given suffixes are
401            returned; it must be a tuple
402    @param nonempty_results: if True then Exception is raised if there is no
403            results
404    @param include_files: if False, then regular files and links are omitted
405    @param include_directories: if False, directories are omitted
406
407    @returns a nonempty list of entries meeting given criteria
408
409    @raises Exception if no matching filenames were found and
410            nonempty_results is set to True
411
412    """
413    # lists all files from the directory and filter them by given criteria
414    list_of_files = []
415    for filename in os.listdir(path):
416        path_entry = os.path.join(path, filename)
417        # check type
418        if os.path.isfile(path_entry):
419            if not include_files:
420                continue
421        elif os.path.isdir(path_entry):
422            if not include_directories:
423                continue
424        else:
425            continue
426        # check suffix
427        if with_suffixes is not None:
428            if not filename.endswith(with_suffixes):
429                continue
430        list_of_files.append(filename)
431    # throws exception if no files were found
432    if nonempty_results and len(list_of_files) == 0:
433        message = 'Directory %s does not contain any ' % path
434        message += 'entries meeting the criteria'
435        raise Exception(message)
436    # returns a non-empty list
437    return list_of_files
438