xref: /aosp_15_r20/prebuilts/build-tools/common/py3-stdlib/email/contentmanager.py (revision cda5da8d549138a6648c5ee6d7a49cf8f4a657be)
1*cda5da8dSAndroid Build Coastguard Workerimport binascii
2*cda5da8dSAndroid Build Coastguard Workerimport email.charset
3*cda5da8dSAndroid Build Coastguard Workerimport email.message
4*cda5da8dSAndroid Build Coastguard Workerimport email.errors
5*cda5da8dSAndroid Build Coastguard Workerfrom email import quoprimime
6*cda5da8dSAndroid Build Coastguard Worker
7*cda5da8dSAndroid Build Coastguard Workerclass ContentManager:
8*cda5da8dSAndroid Build Coastguard Worker
9*cda5da8dSAndroid Build Coastguard Worker    def __init__(self):
10*cda5da8dSAndroid Build Coastguard Worker        self.get_handlers = {}
11*cda5da8dSAndroid Build Coastguard Worker        self.set_handlers = {}
12*cda5da8dSAndroid Build Coastguard Worker
13*cda5da8dSAndroid Build Coastguard Worker    def add_get_handler(self, key, handler):
14*cda5da8dSAndroid Build Coastguard Worker        self.get_handlers[key] = handler
15*cda5da8dSAndroid Build Coastguard Worker
16*cda5da8dSAndroid Build Coastguard Worker    def get_content(self, msg, *args, **kw):
17*cda5da8dSAndroid Build Coastguard Worker        content_type = msg.get_content_type()
18*cda5da8dSAndroid Build Coastguard Worker        if content_type in self.get_handlers:
19*cda5da8dSAndroid Build Coastguard Worker            return self.get_handlers[content_type](msg, *args, **kw)
20*cda5da8dSAndroid Build Coastguard Worker        maintype = msg.get_content_maintype()
21*cda5da8dSAndroid Build Coastguard Worker        if maintype in self.get_handlers:
22*cda5da8dSAndroid Build Coastguard Worker            return self.get_handlers[maintype](msg, *args, **kw)
23*cda5da8dSAndroid Build Coastguard Worker        if '' in self.get_handlers:
24*cda5da8dSAndroid Build Coastguard Worker            return self.get_handlers[''](msg, *args, **kw)
25*cda5da8dSAndroid Build Coastguard Worker        raise KeyError(content_type)
26*cda5da8dSAndroid Build Coastguard Worker
27*cda5da8dSAndroid Build Coastguard Worker    def add_set_handler(self, typekey, handler):
28*cda5da8dSAndroid Build Coastguard Worker        self.set_handlers[typekey] = handler
29*cda5da8dSAndroid Build Coastguard Worker
30*cda5da8dSAndroid Build Coastguard Worker    def set_content(self, msg, obj, *args, **kw):
31*cda5da8dSAndroid Build Coastguard Worker        if msg.get_content_maintype() == 'multipart':
32*cda5da8dSAndroid Build Coastguard Worker            # XXX: is this error a good idea or not?  We can remove it later,
33*cda5da8dSAndroid Build Coastguard Worker            # but we can't add it later, so do it for now.
34*cda5da8dSAndroid Build Coastguard Worker            raise TypeError("set_content not valid on multipart")
35*cda5da8dSAndroid Build Coastguard Worker        handler = self._find_set_handler(msg, obj)
36*cda5da8dSAndroid Build Coastguard Worker        msg.clear_content()
37*cda5da8dSAndroid Build Coastguard Worker        handler(msg, obj, *args, **kw)
38*cda5da8dSAndroid Build Coastguard Worker
39*cda5da8dSAndroid Build Coastguard Worker    def _find_set_handler(self, msg, obj):
40*cda5da8dSAndroid Build Coastguard Worker        full_path_for_error = None
41*cda5da8dSAndroid Build Coastguard Worker        for typ in type(obj).__mro__:
42*cda5da8dSAndroid Build Coastguard Worker            if typ in self.set_handlers:
43*cda5da8dSAndroid Build Coastguard Worker                return self.set_handlers[typ]
44*cda5da8dSAndroid Build Coastguard Worker            qname = typ.__qualname__
45*cda5da8dSAndroid Build Coastguard Worker            modname = getattr(typ, '__module__', '')
46*cda5da8dSAndroid Build Coastguard Worker            full_path = '.'.join((modname, qname)) if modname else qname
47*cda5da8dSAndroid Build Coastguard Worker            if full_path_for_error is None:
48*cda5da8dSAndroid Build Coastguard Worker                full_path_for_error = full_path
49*cda5da8dSAndroid Build Coastguard Worker            if full_path in self.set_handlers:
50*cda5da8dSAndroid Build Coastguard Worker                return self.set_handlers[full_path]
51*cda5da8dSAndroid Build Coastguard Worker            if qname in self.set_handlers:
52*cda5da8dSAndroid Build Coastguard Worker                return self.set_handlers[qname]
53*cda5da8dSAndroid Build Coastguard Worker            name = typ.__name__
54*cda5da8dSAndroid Build Coastguard Worker            if name in self.set_handlers:
55*cda5da8dSAndroid Build Coastguard Worker                return self.set_handlers[name]
56*cda5da8dSAndroid Build Coastguard Worker        if None in self.set_handlers:
57*cda5da8dSAndroid Build Coastguard Worker            return self.set_handlers[None]
58*cda5da8dSAndroid Build Coastguard Worker        raise KeyError(full_path_for_error)
59*cda5da8dSAndroid Build Coastguard Worker
60*cda5da8dSAndroid Build Coastguard Worker
61*cda5da8dSAndroid Build Coastguard Workerraw_data_manager = ContentManager()
62*cda5da8dSAndroid Build Coastguard Worker
63*cda5da8dSAndroid Build Coastguard Worker
64*cda5da8dSAndroid Build Coastguard Workerdef get_text_content(msg, errors='replace'):
65*cda5da8dSAndroid Build Coastguard Worker    content = msg.get_payload(decode=True)
66*cda5da8dSAndroid Build Coastguard Worker    charset = msg.get_param('charset', 'ASCII')
67*cda5da8dSAndroid Build Coastguard Worker    return content.decode(charset, errors=errors)
68*cda5da8dSAndroid Build Coastguard Workerraw_data_manager.add_get_handler('text', get_text_content)
69*cda5da8dSAndroid Build Coastguard Worker
70*cda5da8dSAndroid Build Coastguard Worker
71*cda5da8dSAndroid Build Coastguard Workerdef get_non_text_content(msg):
72*cda5da8dSAndroid Build Coastguard Worker    return msg.get_payload(decode=True)
73*cda5da8dSAndroid Build Coastguard Workerfor maintype in 'audio image video application'.split():
74*cda5da8dSAndroid Build Coastguard Worker    raw_data_manager.add_get_handler(maintype, get_non_text_content)
75*cda5da8dSAndroid Build Coastguard Workerdel maintype
76*cda5da8dSAndroid Build Coastguard Worker
77*cda5da8dSAndroid Build Coastguard Worker
78*cda5da8dSAndroid Build Coastguard Workerdef get_message_content(msg):
79*cda5da8dSAndroid Build Coastguard Worker    return msg.get_payload(0)
80*cda5da8dSAndroid Build Coastguard Workerfor subtype in 'rfc822 external-body'.split():
81*cda5da8dSAndroid Build Coastguard Worker    raw_data_manager.add_get_handler('message/'+subtype, get_message_content)
82*cda5da8dSAndroid Build Coastguard Workerdel subtype
83*cda5da8dSAndroid Build Coastguard Worker
84*cda5da8dSAndroid Build Coastguard Worker
85*cda5da8dSAndroid Build Coastguard Workerdef get_and_fixup_unknown_message_content(msg):
86*cda5da8dSAndroid Build Coastguard Worker    # If we don't understand a message subtype, we are supposed to treat it as
87*cda5da8dSAndroid Build Coastguard Worker    # if it were application/octet-stream, per
88*cda5da8dSAndroid Build Coastguard Worker    # tools.ietf.org/html/rfc2046#section-5.2.4.  Feedparser doesn't do that,
89*cda5da8dSAndroid Build Coastguard Worker    # so do our best to fix things up.  Note that it is *not* appropriate to
90*cda5da8dSAndroid Build Coastguard Worker    # model message/partial content as Message objects, so they are handled
91*cda5da8dSAndroid Build Coastguard Worker    # here as well.  (How to reassemble them is out of scope for this comment :)
92*cda5da8dSAndroid Build Coastguard Worker    return bytes(msg.get_payload(0))
93*cda5da8dSAndroid Build Coastguard Workerraw_data_manager.add_get_handler('message',
94*cda5da8dSAndroid Build Coastguard Worker                                 get_and_fixup_unknown_message_content)
95*cda5da8dSAndroid Build Coastguard Worker
96*cda5da8dSAndroid Build Coastguard Worker
97*cda5da8dSAndroid Build Coastguard Workerdef _prepare_set(msg, maintype, subtype, headers):
98*cda5da8dSAndroid Build Coastguard Worker    msg['Content-Type'] = '/'.join((maintype, subtype))
99*cda5da8dSAndroid Build Coastguard Worker    if headers:
100*cda5da8dSAndroid Build Coastguard Worker        if not hasattr(headers[0], 'name'):
101*cda5da8dSAndroid Build Coastguard Worker            mp = msg.policy
102*cda5da8dSAndroid Build Coastguard Worker            headers = [mp.header_factory(*mp.header_source_parse([header]))
103*cda5da8dSAndroid Build Coastguard Worker                       for header in headers]
104*cda5da8dSAndroid Build Coastguard Worker        try:
105*cda5da8dSAndroid Build Coastguard Worker            for header in headers:
106*cda5da8dSAndroid Build Coastguard Worker                if header.defects:
107*cda5da8dSAndroid Build Coastguard Worker                    raise header.defects[0]
108*cda5da8dSAndroid Build Coastguard Worker                msg[header.name] = header
109*cda5da8dSAndroid Build Coastguard Worker        except email.errors.HeaderDefect as exc:
110*cda5da8dSAndroid Build Coastguard Worker            raise ValueError("Invalid header: {}".format(
111*cda5da8dSAndroid Build Coastguard Worker                                header.fold(policy=msg.policy))) from exc
112*cda5da8dSAndroid Build Coastguard Worker
113*cda5da8dSAndroid Build Coastguard Worker
114*cda5da8dSAndroid Build Coastguard Workerdef _finalize_set(msg, disposition, filename, cid, params):
115*cda5da8dSAndroid Build Coastguard Worker    if disposition is None and filename is not None:
116*cda5da8dSAndroid Build Coastguard Worker        disposition = 'attachment'
117*cda5da8dSAndroid Build Coastguard Worker    if disposition is not None:
118*cda5da8dSAndroid Build Coastguard Worker        msg['Content-Disposition'] = disposition
119*cda5da8dSAndroid Build Coastguard Worker    if filename is not None:
120*cda5da8dSAndroid Build Coastguard Worker        msg.set_param('filename',
121*cda5da8dSAndroid Build Coastguard Worker                      filename,
122*cda5da8dSAndroid Build Coastguard Worker                      header='Content-Disposition',
123*cda5da8dSAndroid Build Coastguard Worker                      replace=True)
124*cda5da8dSAndroid Build Coastguard Worker    if cid is not None:
125*cda5da8dSAndroid Build Coastguard Worker        msg['Content-ID'] = cid
126*cda5da8dSAndroid Build Coastguard Worker    if params is not None:
127*cda5da8dSAndroid Build Coastguard Worker        for key, value in params.items():
128*cda5da8dSAndroid Build Coastguard Worker            msg.set_param(key, value)
129*cda5da8dSAndroid Build Coastguard Worker
130*cda5da8dSAndroid Build Coastguard Worker
131*cda5da8dSAndroid Build Coastguard Worker# XXX: This is a cleaned-up version of base64mime.body_encode (including a bug
132*cda5da8dSAndroid Build Coastguard Worker# fix in the calculation of unencoded_bytes_per_line).  It would be nice to
133*cda5da8dSAndroid Build Coastguard Worker# drop both this and quoprimime.body_encode in favor of enhanced binascii
134*cda5da8dSAndroid Build Coastguard Worker# routines that accepted a max_line_length parameter.
135*cda5da8dSAndroid Build Coastguard Workerdef _encode_base64(data, max_line_length):
136*cda5da8dSAndroid Build Coastguard Worker    encoded_lines = []
137*cda5da8dSAndroid Build Coastguard Worker    unencoded_bytes_per_line = max_line_length // 4 * 3
138*cda5da8dSAndroid Build Coastguard Worker    for i in range(0, len(data), unencoded_bytes_per_line):
139*cda5da8dSAndroid Build Coastguard Worker        thisline = data[i:i+unencoded_bytes_per_line]
140*cda5da8dSAndroid Build Coastguard Worker        encoded_lines.append(binascii.b2a_base64(thisline).decode('ascii'))
141*cda5da8dSAndroid Build Coastguard Worker    return ''.join(encoded_lines)
142*cda5da8dSAndroid Build Coastguard Worker
143*cda5da8dSAndroid Build Coastguard Worker
144*cda5da8dSAndroid Build Coastguard Workerdef _encode_text(string, charset, cte, policy):
145*cda5da8dSAndroid Build Coastguard Worker    lines = string.encode(charset).splitlines()
146*cda5da8dSAndroid Build Coastguard Worker    linesep = policy.linesep.encode('ascii')
147*cda5da8dSAndroid Build Coastguard Worker    def embedded_body(lines): return linesep.join(lines) + linesep
148*cda5da8dSAndroid Build Coastguard Worker    def normal_body(lines): return b'\n'.join(lines) + b'\n'
149*cda5da8dSAndroid Build Coastguard Worker    if cte is None:
150*cda5da8dSAndroid Build Coastguard Worker        # Use heuristics to decide on the "best" encoding.
151*cda5da8dSAndroid Build Coastguard Worker        if max((len(x) for x in lines), default=0) <= policy.max_line_length:
152*cda5da8dSAndroid Build Coastguard Worker            try:
153*cda5da8dSAndroid Build Coastguard Worker                return '7bit', normal_body(lines).decode('ascii')
154*cda5da8dSAndroid Build Coastguard Worker            except UnicodeDecodeError:
155*cda5da8dSAndroid Build Coastguard Worker                pass
156*cda5da8dSAndroid Build Coastguard Worker            if policy.cte_type == '8bit':
157*cda5da8dSAndroid Build Coastguard Worker                return '8bit', normal_body(lines).decode('ascii', 'surrogateescape')
158*cda5da8dSAndroid Build Coastguard Worker        sniff = embedded_body(lines[:10])
159*cda5da8dSAndroid Build Coastguard Worker        sniff_qp = quoprimime.body_encode(sniff.decode('latin-1'),
160*cda5da8dSAndroid Build Coastguard Worker                                          policy.max_line_length)
161*cda5da8dSAndroid Build Coastguard Worker        sniff_base64 = binascii.b2a_base64(sniff)
162*cda5da8dSAndroid Build Coastguard Worker        # This is a little unfair to qp; it includes lineseps, base64 doesn't.
163*cda5da8dSAndroid Build Coastguard Worker        if len(sniff_qp) > len(sniff_base64):
164*cda5da8dSAndroid Build Coastguard Worker            cte = 'base64'
165*cda5da8dSAndroid Build Coastguard Worker        else:
166*cda5da8dSAndroid Build Coastguard Worker            cte = 'quoted-printable'
167*cda5da8dSAndroid Build Coastguard Worker            if len(lines) <= 10:
168*cda5da8dSAndroid Build Coastguard Worker                return cte, sniff_qp
169*cda5da8dSAndroid Build Coastguard Worker    if cte == '7bit':
170*cda5da8dSAndroid Build Coastguard Worker        data = normal_body(lines).decode('ascii')
171*cda5da8dSAndroid Build Coastguard Worker    elif cte == '8bit':
172*cda5da8dSAndroid Build Coastguard Worker        data = normal_body(lines).decode('ascii', 'surrogateescape')
173*cda5da8dSAndroid Build Coastguard Worker    elif cte == 'quoted-printable':
174*cda5da8dSAndroid Build Coastguard Worker        data = quoprimime.body_encode(normal_body(lines).decode('latin-1'),
175*cda5da8dSAndroid Build Coastguard Worker                                      policy.max_line_length)
176*cda5da8dSAndroid Build Coastguard Worker    elif cte == 'base64':
177*cda5da8dSAndroid Build Coastguard Worker        data = _encode_base64(embedded_body(lines), policy.max_line_length)
178*cda5da8dSAndroid Build Coastguard Worker    else:
179*cda5da8dSAndroid Build Coastguard Worker        raise ValueError("Unknown content transfer encoding {}".format(cte))
180*cda5da8dSAndroid Build Coastguard Worker    return cte, data
181*cda5da8dSAndroid Build Coastguard Worker
182*cda5da8dSAndroid Build Coastguard Worker
183*cda5da8dSAndroid Build Coastguard Workerdef set_text_content(msg, string, subtype="plain", charset='utf-8', cte=None,
184*cda5da8dSAndroid Build Coastguard Worker                     disposition=None, filename=None, cid=None,
185*cda5da8dSAndroid Build Coastguard Worker                     params=None, headers=None):
186*cda5da8dSAndroid Build Coastguard Worker    _prepare_set(msg, 'text', subtype, headers)
187*cda5da8dSAndroid Build Coastguard Worker    cte, payload = _encode_text(string, charset, cte, msg.policy)
188*cda5da8dSAndroid Build Coastguard Worker    msg.set_payload(payload)
189*cda5da8dSAndroid Build Coastguard Worker    msg.set_param('charset',
190*cda5da8dSAndroid Build Coastguard Worker                  email.charset.ALIASES.get(charset, charset),
191*cda5da8dSAndroid Build Coastguard Worker                  replace=True)
192*cda5da8dSAndroid Build Coastguard Worker    msg['Content-Transfer-Encoding'] = cte
193*cda5da8dSAndroid Build Coastguard Worker    _finalize_set(msg, disposition, filename, cid, params)
194*cda5da8dSAndroid Build Coastguard Workerraw_data_manager.add_set_handler(str, set_text_content)
195*cda5da8dSAndroid Build Coastguard Worker
196*cda5da8dSAndroid Build Coastguard Worker
197*cda5da8dSAndroid Build Coastguard Workerdef set_message_content(msg, message, subtype="rfc822", cte=None,
198*cda5da8dSAndroid Build Coastguard Worker                       disposition=None, filename=None, cid=None,
199*cda5da8dSAndroid Build Coastguard Worker                       params=None, headers=None):
200*cda5da8dSAndroid Build Coastguard Worker    if subtype == 'partial':
201*cda5da8dSAndroid Build Coastguard Worker        raise ValueError("message/partial is not supported for Message objects")
202*cda5da8dSAndroid Build Coastguard Worker    if subtype == 'rfc822':
203*cda5da8dSAndroid Build Coastguard Worker        if cte not in (None, '7bit', '8bit', 'binary'):
204*cda5da8dSAndroid Build Coastguard Worker            # http://tools.ietf.org/html/rfc2046#section-5.2.1 mandate.
205*cda5da8dSAndroid Build Coastguard Worker            raise ValueError(
206*cda5da8dSAndroid Build Coastguard Worker                "message/rfc822 parts do not support cte={}".format(cte))
207*cda5da8dSAndroid Build Coastguard Worker        # 8bit will get coerced on serialization if policy.cte_type='7bit'.  We
208*cda5da8dSAndroid Build Coastguard Worker        # may end up claiming 8bit when it isn't needed, but the only negative
209*cda5da8dSAndroid Build Coastguard Worker        # result of that should be a gateway that needs to coerce to 7bit
210*cda5da8dSAndroid Build Coastguard Worker        # having to look through the whole embedded message to discover whether
211*cda5da8dSAndroid Build Coastguard Worker        # or not it actually has to do anything.
212*cda5da8dSAndroid Build Coastguard Worker        cte = '8bit' if cte is None else cte
213*cda5da8dSAndroid Build Coastguard Worker    elif subtype == 'external-body':
214*cda5da8dSAndroid Build Coastguard Worker        if cte not in (None, '7bit'):
215*cda5da8dSAndroid Build Coastguard Worker            # http://tools.ietf.org/html/rfc2046#section-5.2.3 mandate.
216*cda5da8dSAndroid Build Coastguard Worker            raise ValueError(
217*cda5da8dSAndroid Build Coastguard Worker                "message/external-body parts do not support cte={}".format(cte))
218*cda5da8dSAndroid Build Coastguard Worker        cte = '7bit'
219*cda5da8dSAndroid Build Coastguard Worker    elif cte is None:
220*cda5da8dSAndroid Build Coastguard Worker        # http://tools.ietf.org/html/rfc2046#section-5.2.4 says all future
221*cda5da8dSAndroid Build Coastguard Worker        # subtypes should be restricted to 7bit, so assume that.
222*cda5da8dSAndroid Build Coastguard Worker        cte = '7bit'
223*cda5da8dSAndroid Build Coastguard Worker    _prepare_set(msg, 'message', subtype, headers)
224*cda5da8dSAndroid Build Coastguard Worker    msg.set_payload([message])
225*cda5da8dSAndroid Build Coastguard Worker    msg['Content-Transfer-Encoding'] = cte
226*cda5da8dSAndroid Build Coastguard Worker    _finalize_set(msg, disposition, filename, cid, params)
227*cda5da8dSAndroid Build Coastguard Workerraw_data_manager.add_set_handler(email.message.Message, set_message_content)
228*cda5da8dSAndroid Build Coastguard Worker
229*cda5da8dSAndroid Build Coastguard Worker
230*cda5da8dSAndroid Build Coastguard Workerdef set_bytes_content(msg, data, maintype, subtype, cte='base64',
231*cda5da8dSAndroid Build Coastguard Worker                     disposition=None, filename=None, cid=None,
232*cda5da8dSAndroid Build Coastguard Worker                     params=None, headers=None):
233*cda5da8dSAndroid Build Coastguard Worker    _prepare_set(msg, maintype, subtype, headers)
234*cda5da8dSAndroid Build Coastguard Worker    if cte == 'base64':
235*cda5da8dSAndroid Build Coastguard Worker        data = _encode_base64(data, max_line_length=msg.policy.max_line_length)
236*cda5da8dSAndroid Build Coastguard Worker    elif cte == 'quoted-printable':
237*cda5da8dSAndroid Build Coastguard Worker        # XXX: quoprimime.body_encode won't encode newline characters in data,
238*cda5da8dSAndroid Build Coastguard Worker        # so we can't use it.  This means max_line_length is ignored.  Another
239*cda5da8dSAndroid Build Coastguard Worker        # bug to fix later.  (Note: encoders.quopri is broken on line ends.)
240*cda5da8dSAndroid Build Coastguard Worker        data = binascii.b2a_qp(data, istext=False, header=False, quotetabs=True)
241*cda5da8dSAndroid Build Coastguard Worker        data = data.decode('ascii')
242*cda5da8dSAndroid Build Coastguard Worker    elif cte == '7bit':
243*cda5da8dSAndroid Build Coastguard Worker        data = data.decode('ascii')
244*cda5da8dSAndroid Build Coastguard Worker    elif cte in ('8bit', 'binary'):
245*cda5da8dSAndroid Build Coastguard Worker        data = data.decode('ascii', 'surrogateescape')
246*cda5da8dSAndroid Build Coastguard Worker    msg.set_payload(data)
247*cda5da8dSAndroid Build Coastguard Worker    msg['Content-Transfer-Encoding'] = cte
248*cda5da8dSAndroid Build Coastguard Worker    _finalize_set(msg, disposition, filename, cid, params)
249*cda5da8dSAndroid Build Coastguard Workerfor typ in (bytes, bytearray, memoryview):
250*cda5da8dSAndroid Build Coastguard Worker    raw_data_manager.add_set_handler(typ, set_bytes_content)
251*cda5da8dSAndroid Build Coastguard Workerdel typ
252