xref: /aosp_15_r20/prebuilts/build-tools/common/py3-stdlib/xml/dom/pulldom.py (revision cda5da8d549138a6648c5ee6d7a49cf8f4a657be)
1import xml.sax
2import xml.sax.handler
3
4START_ELEMENT = "START_ELEMENT"
5END_ELEMENT = "END_ELEMENT"
6COMMENT = "COMMENT"
7START_DOCUMENT = "START_DOCUMENT"
8END_DOCUMENT = "END_DOCUMENT"
9PROCESSING_INSTRUCTION = "PROCESSING_INSTRUCTION"
10IGNORABLE_WHITESPACE = "IGNORABLE_WHITESPACE"
11CHARACTERS = "CHARACTERS"
12
13class PullDOM(xml.sax.ContentHandler):
14    _locator = None
15    document = None
16
17    def __init__(self, documentFactory=None):
18        from xml.dom import XML_NAMESPACE
19        self.documentFactory = documentFactory
20        self.firstEvent = [None, None]
21        self.lastEvent = self.firstEvent
22        self.elementStack = []
23        self.push = self.elementStack.append
24        try:
25            self.pop = self.elementStack.pop
26        except AttributeError:
27            # use class' pop instead
28            pass
29        self._ns_contexts = [{XML_NAMESPACE:'xml'}] # contains uri -> prefix dicts
30        self._current_context = self._ns_contexts[-1]
31        self.pending_events = []
32
33    def pop(self):
34        result = self.elementStack[-1]
35        del self.elementStack[-1]
36        return result
37
38    def setDocumentLocator(self, locator):
39        self._locator = locator
40
41    def startPrefixMapping(self, prefix, uri):
42        if not hasattr(self, '_xmlns_attrs'):
43            self._xmlns_attrs = []
44        self._xmlns_attrs.append((prefix or 'xmlns', uri))
45        self._ns_contexts.append(self._current_context.copy())
46        self._current_context[uri] = prefix or None
47
48    def endPrefixMapping(self, prefix):
49        self._current_context = self._ns_contexts.pop()
50
51    def startElementNS(self, name, tagName , attrs):
52        # Retrieve xml namespace declaration attributes.
53        xmlns_uri = 'http://www.w3.org/2000/xmlns/'
54        xmlns_attrs = getattr(self, '_xmlns_attrs', None)
55        if xmlns_attrs is not None:
56            for aname, value in xmlns_attrs:
57                attrs._attrs[(xmlns_uri, aname)] = value
58            self._xmlns_attrs = []
59        uri, localname = name
60        if uri:
61            # When using namespaces, the reader may or may not
62            # provide us with the original name. If not, create
63            # *a* valid tagName from the current context.
64            if tagName is None:
65                prefix = self._current_context[uri]
66                if prefix:
67                    tagName = prefix + ":" + localname
68                else:
69                    tagName = localname
70            if self.document:
71                node = self.document.createElementNS(uri, tagName)
72            else:
73                node = self.buildDocument(uri, tagName)
74        else:
75            # When the tagname is not prefixed, it just appears as
76            # localname
77            if self.document:
78                node = self.document.createElement(localname)
79            else:
80                node = self.buildDocument(None, localname)
81
82        for aname,value in attrs.items():
83            a_uri, a_localname = aname
84            if a_uri == xmlns_uri:
85                if a_localname == 'xmlns':
86                    qname = a_localname
87                else:
88                    qname = 'xmlns:' + a_localname
89                attr = self.document.createAttributeNS(a_uri, qname)
90                node.setAttributeNodeNS(attr)
91            elif a_uri:
92                prefix = self._current_context[a_uri]
93                if prefix:
94                    qname = prefix + ":" + a_localname
95                else:
96                    qname = a_localname
97                attr = self.document.createAttributeNS(a_uri, qname)
98                node.setAttributeNodeNS(attr)
99            else:
100                attr = self.document.createAttribute(a_localname)
101                node.setAttributeNode(attr)
102            attr.value = value
103
104        self.lastEvent[1] = [(START_ELEMENT, node), None]
105        self.lastEvent = self.lastEvent[1]
106        self.push(node)
107
108    def endElementNS(self, name, tagName):
109        self.lastEvent[1] = [(END_ELEMENT, self.pop()), None]
110        self.lastEvent = self.lastEvent[1]
111
112    def startElement(self, name, attrs):
113        if self.document:
114            node = self.document.createElement(name)
115        else:
116            node = self.buildDocument(None, name)
117
118        for aname,value in attrs.items():
119            attr = self.document.createAttribute(aname)
120            attr.value = value
121            node.setAttributeNode(attr)
122
123        self.lastEvent[1] = [(START_ELEMENT, node), None]
124        self.lastEvent = self.lastEvent[1]
125        self.push(node)
126
127    def endElement(self, name):
128        self.lastEvent[1] = [(END_ELEMENT, self.pop()), None]
129        self.lastEvent = self.lastEvent[1]
130
131    def comment(self, s):
132        if self.document:
133            node = self.document.createComment(s)
134            self.lastEvent[1] = [(COMMENT, node), None]
135            self.lastEvent = self.lastEvent[1]
136        else:
137            event = [(COMMENT, s), None]
138            self.pending_events.append(event)
139
140    def processingInstruction(self, target, data):
141        if self.document:
142            node = self.document.createProcessingInstruction(target, data)
143            self.lastEvent[1] = [(PROCESSING_INSTRUCTION, node), None]
144            self.lastEvent = self.lastEvent[1]
145        else:
146            event = [(PROCESSING_INSTRUCTION, target, data), None]
147            self.pending_events.append(event)
148
149    def ignorableWhitespace(self, chars):
150        node = self.document.createTextNode(chars)
151        self.lastEvent[1] = [(IGNORABLE_WHITESPACE, node), None]
152        self.lastEvent = self.lastEvent[1]
153
154    def characters(self, chars):
155        node = self.document.createTextNode(chars)
156        self.lastEvent[1] = [(CHARACTERS, node), None]
157        self.lastEvent = self.lastEvent[1]
158
159    def startDocument(self):
160        if self.documentFactory is None:
161            import xml.dom.minidom
162            self.documentFactory = xml.dom.minidom.Document.implementation
163
164    def buildDocument(self, uri, tagname):
165        # Can't do that in startDocument, since we need the tagname
166        # XXX: obtain DocumentType
167        node = self.documentFactory.createDocument(uri, tagname, None)
168        self.document = node
169        self.lastEvent[1] = [(START_DOCUMENT, node), None]
170        self.lastEvent = self.lastEvent[1]
171        self.push(node)
172        # Put everything we have seen so far into the document
173        for e in self.pending_events:
174            if e[0][0] == PROCESSING_INSTRUCTION:
175                _,target,data = e[0]
176                n = self.document.createProcessingInstruction(target, data)
177                e[0] = (PROCESSING_INSTRUCTION, n)
178            elif e[0][0] == COMMENT:
179                n = self.document.createComment(e[0][1])
180                e[0] = (COMMENT, n)
181            else:
182                raise AssertionError("Unknown pending event ",e[0][0])
183            self.lastEvent[1] = e
184            self.lastEvent = e
185        self.pending_events = None
186        return node.firstChild
187
188    def endDocument(self):
189        self.lastEvent[1] = [(END_DOCUMENT, self.document), None]
190        self.pop()
191
192    def clear(self):
193        "clear(): Explicitly release parsing structures"
194        self.document = None
195
196class ErrorHandler:
197    def warning(self, exception):
198        print(exception)
199    def error(self, exception):
200        raise exception
201    def fatalError(self, exception):
202        raise exception
203
204class DOMEventStream:
205    def __init__(self, stream, parser, bufsize):
206        self.stream = stream
207        self.parser = parser
208        self.bufsize = bufsize
209        if not hasattr(self.parser, 'feed'):
210            self.getEvent = self._slurp
211        self.reset()
212
213    def reset(self):
214        self.pulldom = PullDOM()
215        # This content handler relies on namespace support
216        self.parser.setFeature(xml.sax.handler.feature_namespaces, 1)
217        self.parser.setContentHandler(self.pulldom)
218
219    def __next__(self):
220        rc = self.getEvent()
221        if rc:
222            return rc
223        raise StopIteration
224
225    def __iter__(self):
226        return self
227
228    def expandNode(self, node):
229        event = self.getEvent()
230        parents = [node]
231        while event:
232            token, cur_node = event
233            if cur_node is node:
234                return
235            if token != END_ELEMENT:
236                parents[-1].appendChild(cur_node)
237            if token == START_ELEMENT:
238                parents.append(cur_node)
239            elif token == END_ELEMENT:
240                del parents[-1]
241            event = self.getEvent()
242
243    def getEvent(self):
244        # use IncrementalParser interface, so we get the desired
245        # pull effect
246        if not self.pulldom.firstEvent[1]:
247            self.pulldom.lastEvent = self.pulldom.firstEvent
248        while not self.pulldom.firstEvent[1]:
249            buf = self.stream.read(self.bufsize)
250            if not buf:
251                self.parser.close()
252                return None
253            self.parser.feed(buf)
254        rc = self.pulldom.firstEvent[1][0]
255        self.pulldom.firstEvent[1] = self.pulldom.firstEvent[1][1]
256        return rc
257
258    def _slurp(self):
259        """ Fallback replacement for getEvent() using the
260            standard SAX2 interface, which means we slurp the
261            SAX events into memory (no performance gain, but
262            we are compatible to all SAX parsers).
263        """
264        self.parser.parse(self.stream)
265        self.getEvent = self._emit
266        return self._emit()
267
268    def _emit(self):
269        """ Fallback replacement for getEvent() that emits
270            the events that _slurp() read previously.
271        """
272        rc = self.pulldom.firstEvent[1][0]
273        self.pulldom.firstEvent[1] = self.pulldom.firstEvent[1][1]
274        return rc
275
276    def clear(self):
277        """clear(): Explicitly release parsing objects"""
278        self.pulldom.clear()
279        del self.pulldom
280        self.parser = None
281        self.stream = None
282
283class SAX2DOM(PullDOM):
284
285    def startElementNS(self, name, tagName , attrs):
286        PullDOM.startElementNS(self, name, tagName, attrs)
287        curNode = self.elementStack[-1]
288        parentNode = self.elementStack[-2]
289        parentNode.appendChild(curNode)
290
291    def startElement(self, name, attrs):
292        PullDOM.startElement(self, name, attrs)
293        curNode = self.elementStack[-1]
294        parentNode = self.elementStack[-2]
295        parentNode.appendChild(curNode)
296
297    def processingInstruction(self, target, data):
298        PullDOM.processingInstruction(self, target, data)
299        node = self.lastEvent[0][1]
300        parentNode = self.elementStack[-1]
301        parentNode.appendChild(node)
302
303    def ignorableWhitespace(self, chars):
304        PullDOM.ignorableWhitespace(self, chars)
305        node = self.lastEvent[0][1]
306        parentNode = self.elementStack[-1]
307        parentNode.appendChild(node)
308
309    def characters(self, chars):
310        PullDOM.characters(self, chars)
311        node = self.lastEvent[0][1]
312        parentNode = self.elementStack[-1]
313        parentNode.appendChild(node)
314
315
316default_bufsize = (2 ** 14) - 20
317
318def parse(stream_or_string, parser=None, bufsize=None):
319    if bufsize is None:
320        bufsize = default_bufsize
321    if isinstance(stream_or_string, str):
322        stream = open(stream_or_string, 'rb')
323    else:
324        stream = stream_or_string
325    if not parser:
326        parser = xml.sax.make_parser()
327    return DOMEventStream(stream, parser, bufsize)
328
329def parseString(string, parser=None):
330    from io import StringIO
331
332    bufsize = len(string)
333    buf = StringIO(string)
334    if not parser:
335        parser = xml.sax.make_parser()
336    return DOMEventStream(buf, parser, bufsize)
337