1# -*- Mode: Python; tab-width: 4 -*- 2# Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp 3# Author: Sam Rushing <[email protected]> 4 5# ====================================================================== 6# Copyright 1996 by Sam Rushing 7# 8# All Rights Reserved 9# 10# Permission to use, copy, modify, and distribute this software and 11# its documentation for any purpose and without fee is hereby 12# granted, provided that the above copyright notice appear in all 13# copies and that both that copyright notice and this permission 14# notice appear in supporting documentation, and that the name of Sam 15# Rushing not be used in advertising or publicity pertaining to 16# distribution of the software without specific, written prior 17# permission. 18# 19# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, 20# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN 21# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR 22# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS 23# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, 24# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN 25# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 26# ====================================================================== 27 28r"""A class supporting chat-style (command/response) protocols. 29 30This class adds support for 'chat' style protocols - where one side 31sends a 'command', and the other sends a response (examples would be 32the common internet protocols - smtp, nntp, ftp, etc..). 33 34The handle_read() method looks at the input stream for the current 35'terminator' (usually '\r\n' for single-line responses, '\r\n.\r\n' 36for multi-line output), calling self.found_terminator() on its 37receipt. 38 39for example: 40Say you build an async nntp client using this class. At the start 41of the connection, you'll have self.terminator set to '\r\n', in 42order to process the single-line greeting. Just before issuing a 43'LIST' command you'll set it to '\r\n.\r\n'. The output of the LIST 44command will be accumulated (using your own 'collect_incoming_data' 45method) up to the terminator, and then control will be returned to 46you - by calling your self.found_terminator() method. 47""" 48import asyncore 49from collections import deque 50 51from warnings import _deprecated 52 53_DEPRECATION_MSG = ('The {name} module is deprecated and will be removed in ' 54 'Python {remove}. The recommended replacement is asyncio') 55_deprecated(__name__, _DEPRECATION_MSG, remove=(3, 12)) 56 57 58 59class async_chat(asyncore.dispatcher): 60 """This is an abstract class. You must derive from this class, and add 61 the two methods collect_incoming_data() and found_terminator()""" 62 63 # these are overridable defaults 64 65 ac_in_buffer_size = 65536 66 ac_out_buffer_size = 65536 67 68 # we don't want to enable the use of encoding by default, because that is a 69 # sign of an application bug that we don't want to pass silently 70 71 use_encoding = 0 72 encoding = 'latin-1' 73 74 def __init__(self, sock=None, map=None): 75 # for string terminator matching 76 self.ac_in_buffer = b'' 77 78 # we use a list here rather than io.BytesIO for a few reasons... 79 # del lst[:] is faster than bio.truncate(0) 80 # lst = [] is faster than bio.truncate(0) 81 self.incoming = [] 82 83 # we toss the use of the "simple producer" and replace it with 84 # a pure deque, which the original fifo was a wrapping of 85 self.producer_fifo = deque() 86 asyncore.dispatcher.__init__(self, sock, map) 87 88 def collect_incoming_data(self, data): 89 raise NotImplementedError("must be implemented in subclass") 90 91 def _collect_incoming_data(self, data): 92 self.incoming.append(data) 93 94 def _get_data(self): 95 d = b''.join(self.incoming) 96 del self.incoming[:] 97 return d 98 99 def found_terminator(self): 100 raise NotImplementedError("must be implemented in subclass") 101 102 def set_terminator(self, term): 103 """Set the input delimiter. 104 105 Can be a fixed string of any length, an integer, or None. 106 """ 107 if isinstance(term, str) and self.use_encoding: 108 term = bytes(term, self.encoding) 109 elif isinstance(term, int) and term < 0: 110 raise ValueError('the number of received bytes must be positive') 111 self.terminator = term 112 113 def get_terminator(self): 114 return self.terminator 115 116 # grab some more data from the socket, 117 # throw it to the collector method, 118 # check for the terminator, 119 # if found, transition to the next state. 120 121 def handle_read(self): 122 123 try: 124 data = self.recv(self.ac_in_buffer_size) 125 except BlockingIOError: 126 return 127 except OSError: 128 self.handle_error() 129 return 130 131 if isinstance(data, str) and self.use_encoding: 132 data = bytes(str, self.encoding) 133 self.ac_in_buffer = self.ac_in_buffer + data 134 135 # Continue to search for self.terminator in self.ac_in_buffer, 136 # while calling self.collect_incoming_data. The while loop 137 # is necessary because we might read several data+terminator 138 # combos with a single recv(4096). 139 140 while self.ac_in_buffer: 141 lb = len(self.ac_in_buffer) 142 terminator = self.get_terminator() 143 if not terminator: 144 # no terminator, collect it all 145 self.collect_incoming_data(self.ac_in_buffer) 146 self.ac_in_buffer = b'' 147 elif isinstance(terminator, int): 148 # numeric terminator 149 n = terminator 150 if lb < n: 151 self.collect_incoming_data(self.ac_in_buffer) 152 self.ac_in_buffer = b'' 153 self.terminator = self.terminator - lb 154 else: 155 self.collect_incoming_data(self.ac_in_buffer[:n]) 156 self.ac_in_buffer = self.ac_in_buffer[n:] 157 self.terminator = 0 158 self.found_terminator() 159 else: 160 # 3 cases: 161 # 1) end of buffer matches terminator exactly: 162 # collect data, transition 163 # 2) end of buffer matches some prefix: 164 # collect data to the prefix 165 # 3) end of buffer does not match any prefix: 166 # collect data 167 terminator_len = len(terminator) 168 index = self.ac_in_buffer.find(terminator) 169 if index != -1: 170 # we found the terminator 171 if index > 0: 172 # don't bother reporting the empty string 173 # (source of subtle bugs) 174 self.collect_incoming_data(self.ac_in_buffer[:index]) 175 self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:] 176 # This does the Right Thing if the terminator 177 # is changed here. 178 self.found_terminator() 179 else: 180 # check for a prefix of the terminator 181 index = find_prefix_at_end(self.ac_in_buffer, terminator) 182 if index: 183 if index != lb: 184 # we found a prefix, collect up to the prefix 185 self.collect_incoming_data(self.ac_in_buffer[:-index]) 186 self.ac_in_buffer = self.ac_in_buffer[-index:] 187 break 188 else: 189 # no prefix, collect it all 190 self.collect_incoming_data(self.ac_in_buffer) 191 self.ac_in_buffer = b'' 192 193 def handle_write(self): 194 self.initiate_send() 195 196 def handle_close(self): 197 self.close() 198 199 def push(self, data): 200 if not isinstance(data, (bytes, bytearray, memoryview)): 201 raise TypeError('data argument must be byte-ish (%r)', 202 type(data)) 203 sabs = self.ac_out_buffer_size 204 if len(data) > sabs: 205 for i in range(0, len(data), sabs): 206 self.producer_fifo.append(data[i:i+sabs]) 207 else: 208 self.producer_fifo.append(data) 209 self.initiate_send() 210 211 def push_with_producer(self, producer): 212 self.producer_fifo.append(producer) 213 self.initiate_send() 214 215 def readable(self): 216 "predicate for inclusion in the readable for select()" 217 # cannot use the old predicate, it violates the claim of the 218 # set_terminator method. 219 220 # return (len(self.ac_in_buffer) <= self.ac_in_buffer_size) 221 return 1 222 223 def writable(self): 224 "predicate for inclusion in the writable for select()" 225 return self.producer_fifo or (not self.connected) 226 227 def close_when_done(self): 228 "automatically close this channel once the outgoing queue is empty" 229 self.producer_fifo.append(None) 230 231 def initiate_send(self): 232 while self.producer_fifo and self.connected: 233 first = self.producer_fifo[0] 234 # handle empty string/buffer or None entry 235 if not first: 236 del self.producer_fifo[0] 237 if first is None: 238 self.handle_close() 239 return 240 241 # handle classic producer behavior 242 obs = self.ac_out_buffer_size 243 try: 244 data = first[:obs] 245 except TypeError: 246 data = first.more() 247 if data: 248 self.producer_fifo.appendleft(data) 249 else: 250 del self.producer_fifo[0] 251 continue 252 253 if isinstance(data, str) and self.use_encoding: 254 data = bytes(data, self.encoding) 255 256 # send the data 257 try: 258 num_sent = self.send(data) 259 except OSError: 260 self.handle_error() 261 return 262 263 if num_sent: 264 if num_sent < len(data) or obs < len(first): 265 self.producer_fifo[0] = first[num_sent:] 266 else: 267 del self.producer_fifo[0] 268 # we tried to send some actual data 269 return 270 271 def discard_buffers(self): 272 # Emergencies only! 273 self.ac_in_buffer = b'' 274 del self.incoming[:] 275 self.producer_fifo.clear() 276 277 278class simple_producer: 279 280 def __init__(self, data, buffer_size=512): 281 self.data = data 282 self.buffer_size = buffer_size 283 284 def more(self): 285 if len(self.data) > self.buffer_size: 286 result = self.data[:self.buffer_size] 287 self.data = self.data[self.buffer_size:] 288 return result 289 else: 290 result = self.data 291 self.data = b'' 292 return result 293 294 295# Given 'haystack', see if any prefix of 'needle' is at its end. This 296# assumes an exact match has already been checked. Return the number of 297# characters matched. 298# for example: 299# f_p_a_e("qwerty\r", "\r\n") => 1 300# f_p_a_e("qwertydkjf", "\r\n") => 0 301# f_p_a_e("qwerty\r\n", "\r\n") => <undefined> 302 303# this could maybe be made faster with a computed regex? 304# [answer: no; circa Python-2.0, Jan 2001] 305# new python: 28961/s 306# old python: 18307/s 307# re: 12820/s 308# regex: 14035/s 309 310def find_prefix_at_end(haystack, needle): 311 l = len(needle) - 1 312 while l and not haystack.endswith(needle[:l]): 313 l -= 1 314 return l 315