1*67e74705SXin Li#!/usr/bin/env python 2*67e74705SXin Li 3*67e74705SXin Li""" 4*67e74705SXin LiThis is a generic fuzz testing tool, see --help for more information. 5*67e74705SXin Li""" 6*67e74705SXin Li 7*67e74705SXin Liimport os 8*67e74705SXin Liimport sys 9*67e74705SXin Liimport random 10*67e74705SXin Liimport subprocess 11*67e74705SXin Liimport itertools 12*67e74705SXin Li 13*67e74705SXin Liclass TestGenerator: 14*67e74705SXin Li def __init__(self, inputs, delete, insert, replace, 15*67e74705SXin Li insert_strings, pick_input): 16*67e74705SXin Li self.inputs = [(s, open(s).read()) for s in inputs] 17*67e74705SXin Li 18*67e74705SXin Li self.delete = bool(delete) 19*67e74705SXin Li self.insert = bool(insert) 20*67e74705SXin Li self.replace = bool(replace) 21*67e74705SXin Li self.pick_input = bool(pick_input) 22*67e74705SXin Li self.insert_strings = list(insert_strings) 23*67e74705SXin Li 24*67e74705SXin Li self.num_positions = sum([len(d) for _,d in self.inputs]) 25*67e74705SXin Li self.num_insert_strings = len(insert_strings) 26*67e74705SXin Li self.num_tests = ((delete + (insert + replace)*self.num_insert_strings) 27*67e74705SXin Li * self.num_positions) 28*67e74705SXin Li self.num_tests += 1 29*67e74705SXin Li 30*67e74705SXin Li if self.pick_input: 31*67e74705SXin Li self.num_tests *= self.num_positions 32*67e74705SXin Li 33*67e74705SXin Li def position_to_source_index(self, position): 34*67e74705SXin Li for i,(s,d) in enumerate(self.inputs): 35*67e74705SXin Li n = len(d) 36*67e74705SXin Li if position < n: 37*67e74705SXin Li return (i,position) 38*67e74705SXin Li position -= n 39*67e74705SXin Li raise ValueError,'Invalid position.' 40*67e74705SXin Li 41*67e74705SXin Li def get_test(self, index): 42*67e74705SXin Li assert 0 <= index < self.num_tests 43*67e74705SXin Li 44*67e74705SXin Li picked_position = None 45*67e74705SXin Li if self.pick_input: 46*67e74705SXin Li index,picked_position = divmod(index, self.num_positions) 47*67e74705SXin Li picked_position = self.position_to_source_index(picked_position) 48*67e74705SXin Li 49*67e74705SXin Li if index == 0: 50*67e74705SXin Li return ('nothing', None, None, picked_position) 51*67e74705SXin Li 52*67e74705SXin Li index -= 1 53*67e74705SXin Li index,position = divmod(index, self.num_positions) 54*67e74705SXin Li position = self.position_to_source_index(position) 55*67e74705SXin Li if self.delete: 56*67e74705SXin Li if index == 0: 57*67e74705SXin Li return ('delete', position, None, picked_position) 58*67e74705SXin Li index -= 1 59*67e74705SXin Li 60*67e74705SXin Li index,insert_index = divmod(index, self.num_insert_strings) 61*67e74705SXin Li insert_str = self.insert_strings[insert_index] 62*67e74705SXin Li if self.insert: 63*67e74705SXin Li if index == 0: 64*67e74705SXin Li return ('insert', position, insert_str, picked_position) 65*67e74705SXin Li index -= 1 66*67e74705SXin Li 67*67e74705SXin Li assert self.replace 68*67e74705SXin Li assert index == 0 69*67e74705SXin Li return ('replace', position, insert_str, picked_position) 70*67e74705SXin Li 71*67e74705SXin Liclass TestApplication: 72*67e74705SXin Li def __init__(self, tg, test): 73*67e74705SXin Li self.tg = tg 74*67e74705SXin Li self.test = test 75*67e74705SXin Li 76*67e74705SXin Li def apply(self): 77*67e74705SXin Li if self.test[0] == 'nothing': 78*67e74705SXin Li pass 79*67e74705SXin Li else: 80*67e74705SXin Li i,j = self.test[1] 81*67e74705SXin Li name,data = self.tg.inputs[i] 82*67e74705SXin Li if self.test[0] == 'delete': 83*67e74705SXin Li data = data[:j] + data[j+1:] 84*67e74705SXin Li elif self.test[0] == 'insert': 85*67e74705SXin Li data = data[:j] + self.test[2] + data[j:] 86*67e74705SXin Li elif self.test[0] == 'replace': 87*67e74705SXin Li data = data[:j] + self.test[2] + data[j+1:] 88*67e74705SXin Li else: 89*67e74705SXin Li raise ValueError,'Invalid test %r' % self.test 90*67e74705SXin Li open(name,'wb').write(data) 91*67e74705SXin Li 92*67e74705SXin Li def revert(self): 93*67e74705SXin Li if self.test[0] != 'nothing': 94*67e74705SXin Li i,j = self.test[1] 95*67e74705SXin Li name,data = self.tg.inputs[i] 96*67e74705SXin Li open(name,'wb').write(data) 97*67e74705SXin Li 98*67e74705SXin Lidef quote(str): 99*67e74705SXin Li return '"' + str + '"' 100*67e74705SXin Li 101*67e74705SXin Lidef run_one_test(test_application, index, input_files, args): 102*67e74705SXin Li test = test_application.test 103*67e74705SXin Li 104*67e74705SXin Li # Interpolate arguments. 105*67e74705SXin Li options = { 'index' : index, 106*67e74705SXin Li 'inputs' : ' '.join(quote(f) for f in input_files) } 107*67e74705SXin Li 108*67e74705SXin Li # Add picked input interpolation arguments, if used. 109*67e74705SXin Li if test[3] is not None: 110*67e74705SXin Li pos = test[3][1] 111*67e74705SXin Li options['picked_input'] = input_files[test[3][0]] 112*67e74705SXin Li options['picked_input_pos'] = pos 113*67e74705SXin Li # Compute the line and column. 114*67e74705SXin Li file_data = test_application.tg.inputs[test[3][0]][1] 115*67e74705SXin Li line = column = 1 116*67e74705SXin Li for i in range(pos): 117*67e74705SXin Li c = file_data[i] 118*67e74705SXin Li if c == '\n': 119*67e74705SXin Li line += 1 120*67e74705SXin Li column = 1 121*67e74705SXin Li else: 122*67e74705SXin Li column += 1 123*67e74705SXin Li options['picked_input_line'] = line 124*67e74705SXin Li options['picked_input_col'] = column 125*67e74705SXin Li 126*67e74705SXin Li test_args = [a % options for a in args] 127*67e74705SXin Li if opts.verbose: 128*67e74705SXin Li print '%s: note: executing %r' % (sys.argv[0], test_args) 129*67e74705SXin Li 130*67e74705SXin Li stdout = None 131*67e74705SXin Li stderr = None 132*67e74705SXin Li if opts.log_dir: 133*67e74705SXin Li stdout_log_path = os.path.join(opts.log_dir, '%s.out' % index) 134*67e74705SXin Li stderr_log_path = os.path.join(opts.log_dir, '%s.err' % index) 135*67e74705SXin Li stdout = open(stdout_log_path, 'wb') 136*67e74705SXin Li stderr = open(stderr_log_path, 'wb') 137*67e74705SXin Li else: 138*67e74705SXin Li sys.stdout.flush() 139*67e74705SXin Li p = subprocess.Popen(test_args, stdout=stdout, stderr=stderr) 140*67e74705SXin Li p.communicate() 141*67e74705SXin Li exit_code = p.wait() 142*67e74705SXin Li 143*67e74705SXin Li test_result = (exit_code == opts.expected_exit_code or 144*67e74705SXin Li exit_code in opts.extra_exit_codes) 145*67e74705SXin Li 146*67e74705SXin Li if stdout is not None: 147*67e74705SXin Li stdout.close() 148*67e74705SXin Li stderr.close() 149*67e74705SXin Li 150*67e74705SXin Li # Remove the logs for passes, unless logging all results. 151*67e74705SXin Li if not opts.log_all and test_result: 152*67e74705SXin Li os.remove(stdout_log_path) 153*67e74705SXin Li os.remove(stderr_log_path) 154*67e74705SXin Li 155*67e74705SXin Li if not test_result: 156*67e74705SXin Li print 'FAIL: %d' % index 157*67e74705SXin Li elif not opts.succinct: 158*67e74705SXin Li print 'PASS: %d' % index 159*67e74705SXin Li return test_result 160*67e74705SXin Li 161*67e74705SXin Lidef main(): 162*67e74705SXin Li global opts 163*67e74705SXin Li from optparse import OptionParser, OptionGroup 164*67e74705SXin Li parser = OptionParser("""%prog [options] ... test command args ... 165*67e74705SXin Li 166*67e74705SXin Li%prog is a tool for fuzzing inputs and testing them. 167*67e74705SXin Li 168*67e74705SXin LiThe most basic usage is something like: 169*67e74705SXin Li 170*67e74705SXin Li $ %prog --file foo.txt ./test.sh 171*67e74705SXin Li 172*67e74705SXin Liwhich will run a default list of fuzzing strategies on the input. For each 173*67e74705SXin Lifuzzed input, it will overwrite the input files (in place), run the test script, 174*67e74705SXin Lithen restore the files back to their original contents. 175*67e74705SXin Li 176*67e74705SXin LiNOTE: You should make sure you have a backup copy of your inputs, in case 177*67e74705SXin Lisomething goes wrong!!! 178*67e74705SXin Li 179*67e74705SXin LiYou can cause the fuzzing to not restore the original files with 180*67e74705SXin Li'--no-revert'. Generally this is used with '--test <index>' to run one failing 181*67e74705SXin Litest and then leave the fuzzed inputs in place to examine the failure. 182*67e74705SXin Li 183*67e74705SXin LiFor each fuzzed input, %prog will run the test command given on the command 184*67e74705SXin Liline. Each argument in the command is subject to string interpolation before 185*67e74705SXin Libeing executed. The syntax is "%(VARIABLE)FORMAT" where FORMAT is a standard 186*67e74705SXin Liprintf format, and VARIABLE is one of: 187*67e74705SXin Li 188*67e74705SXin Li 'index' - the test index being run 189*67e74705SXin Li 'inputs' - the full list of test inputs 190*67e74705SXin Li 'picked_input' - (with --pick-input) the selected input file 191*67e74705SXin Li 'picked_input_pos' - (with --pick-input) the selected input position 192*67e74705SXin Li 'picked_input_line' - (with --pick-input) the selected input line 193*67e74705SXin Li 'picked_input_col' - (with --pick-input) the selected input column 194*67e74705SXin Li 195*67e74705SXin LiBy default, the script will run forever continually picking new tests to 196*67e74705SXin Lirun. You can limit the number of tests that are run with '--max-tests <number>', 197*67e74705SXin Liand you can run a particular test with '--test <index>'. 198*67e74705SXin Li 199*67e74705SXin LiYou can specify '--stop-on-fail' to stop the script on the first failure 200*67e74705SXin Liwithout reverting the changes. 201*67e74705SXin Li 202*67e74705SXin Li""") 203*67e74705SXin Li parser.add_option("-v", "--verbose", help="Show more output", 204*67e74705SXin Li action='store_true', dest="verbose", default=False) 205*67e74705SXin Li parser.add_option("-s", "--succinct", help="Reduce amount of output", 206*67e74705SXin Li action="store_true", dest="succinct", default=False) 207*67e74705SXin Li 208*67e74705SXin Li group = OptionGroup(parser, "Test Execution") 209*67e74705SXin Li group.add_option("", "--expected-exit-code", help="Set expected exit code", 210*67e74705SXin Li type=int, dest="expected_exit_code", 211*67e74705SXin Li default=0) 212*67e74705SXin Li group.add_option("", "--extra-exit-code", 213*67e74705SXin Li help="Set additional expected exit code", 214*67e74705SXin Li type=int, action="append", dest="extra_exit_codes", 215*67e74705SXin Li default=[]) 216*67e74705SXin Li group.add_option("", "--log-dir", 217*67e74705SXin Li help="Capture test logs to an output directory", 218*67e74705SXin Li type=str, dest="log_dir", 219*67e74705SXin Li default=None) 220*67e74705SXin Li group.add_option("", "--log-all", 221*67e74705SXin Li help="Log all outputs (not just failures)", 222*67e74705SXin Li action="store_true", dest="log_all", default=False) 223*67e74705SXin Li parser.add_option_group(group) 224*67e74705SXin Li 225*67e74705SXin Li group = OptionGroup(parser, "Input Files") 226*67e74705SXin Li group.add_option("", "--file", metavar="PATH", 227*67e74705SXin Li help="Add an input file to fuzz", 228*67e74705SXin Li type=str, action="append", dest="input_files", default=[]) 229*67e74705SXin Li group.add_option("", "--filelist", metavar="LIST", 230*67e74705SXin Li help="Add a list of inputs files to fuzz (one per line)", 231*67e74705SXin Li type=str, action="append", dest="filelists", default=[]) 232*67e74705SXin Li parser.add_option_group(group) 233*67e74705SXin Li 234*67e74705SXin Li group = OptionGroup(parser, "Fuzz Options") 235*67e74705SXin Li group.add_option("", "--replacement-chars", dest="replacement_chars", 236*67e74705SXin Li help="Characters to insert/replace", 237*67e74705SXin Li default="0{}[]<>\;@#$^%& ") 238*67e74705SXin Li group.add_option("", "--replacement-string", dest="replacement_strings", 239*67e74705SXin Li action="append", help="Add a replacement string to use", 240*67e74705SXin Li default=[]) 241*67e74705SXin Li group.add_option("", "--replacement-list", dest="replacement_lists", 242*67e74705SXin Li help="Add a list of replacement strings (one per line)", 243*67e74705SXin Li action="append", default=[]) 244*67e74705SXin Li group.add_option("", "--no-delete", help="Don't delete characters", 245*67e74705SXin Li action='store_false', dest="enable_delete", default=True) 246*67e74705SXin Li group.add_option("", "--no-insert", help="Don't insert strings", 247*67e74705SXin Li action='store_false', dest="enable_insert", default=True) 248*67e74705SXin Li group.add_option("", "--no-replace", help="Don't replace strings", 249*67e74705SXin Li action='store_false', dest="enable_replace", default=True) 250*67e74705SXin Li group.add_option("", "--no-revert", help="Don't revert changes", 251*67e74705SXin Li action='store_false', dest="revert", default=True) 252*67e74705SXin Li group.add_option("", "--stop-on-fail", help="Stop on first failure", 253*67e74705SXin Li action='store_true', dest="stop_on_fail", default=False) 254*67e74705SXin Li parser.add_option_group(group) 255*67e74705SXin Li 256*67e74705SXin Li group = OptionGroup(parser, "Test Selection") 257*67e74705SXin Li group.add_option("", "--test", help="Run a particular test", 258*67e74705SXin Li type=int, dest="test", default=None, metavar="INDEX") 259*67e74705SXin Li group.add_option("", "--max-tests", help="Maximum number of tests", 260*67e74705SXin Li type=int, dest="max_tests", default=None, metavar="COUNT") 261*67e74705SXin Li group.add_option("", "--pick-input", 262*67e74705SXin Li help="Randomly select an input byte as well as fuzzing", 263*67e74705SXin Li action='store_true', dest="pick_input", default=False) 264*67e74705SXin Li parser.add_option_group(group) 265*67e74705SXin Li 266*67e74705SXin Li parser.disable_interspersed_args() 267*67e74705SXin Li 268*67e74705SXin Li (opts, args) = parser.parse_args() 269*67e74705SXin Li 270*67e74705SXin Li if not args: 271*67e74705SXin Li parser.error("Invalid number of arguments") 272*67e74705SXin Li 273*67e74705SXin Li # Collect the list of inputs. 274*67e74705SXin Li input_files = list(opts.input_files) 275*67e74705SXin Li for filelist in opts.filelists: 276*67e74705SXin Li f = open(filelist) 277*67e74705SXin Li try: 278*67e74705SXin Li for ln in f: 279*67e74705SXin Li ln = ln.strip() 280*67e74705SXin Li if ln: 281*67e74705SXin Li input_files.append(ln) 282*67e74705SXin Li finally: 283*67e74705SXin Li f.close() 284*67e74705SXin Li input_files.sort() 285*67e74705SXin Li 286*67e74705SXin Li if not input_files: 287*67e74705SXin Li parser.error("No input files!") 288*67e74705SXin Li 289*67e74705SXin Li print '%s: note: fuzzing %d files.' % (sys.argv[0], len(input_files)) 290*67e74705SXin Li 291*67e74705SXin Li # Make sure the log directory exists if used. 292*67e74705SXin Li if opts.log_dir: 293*67e74705SXin Li if not os.path.exists(opts.log_dir): 294*67e74705SXin Li try: 295*67e74705SXin Li os.mkdir(opts.log_dir) 296*67e74705SXin Li except OSError: 297*67e74705SXin Li print "%s: error: log directory couldn't be created!" % ( 298*67e74705SXin Li sys.argv[0],) 299*67e74705SXin Li raise SystemExit,1 300*67e74705SXin Li 301*67e74705SXin Li # Get the list if insert/replacement strings. 302*67e74705SXin Li replacements = list(opts.replacement_chars) 303*67e74705SXin Li replacements.extend(opts.replacement_strings) 304*67e74705SXin Li for replacement_list in opts.replacement_lists: 305*67e74705SXin Li f = open(replacement_list) 306*67e74705SXin Li try: 307*67e74705SXin Li for ln in f: 308*67e74705SXin Li ln = ln[:-1] 309*67e74705SXin Li if ln: 310*67e74705SXin Li replacements.append(ln) 311*67e74705SXin Li finally: 312*67e74705SXin Li f.close() 313*67e74705SXin Li 314*67e74705SXin Li # Unique and order the replacement list. 315*67e74705SXin Li replacements = list(set(replacements)) 316*67e74705SXin Li replacements.sort() 317*67e74705SXin Li 318*67e74705SXin Li # Create the test generator. 319*67e74705SXin Li tg = TestGenerator(input_files, opts.enable_delete, opts.enable_insert, 320*67e74705SXin Li opts.enable_replace, replacements, opts.pick_input) 321*67e74705SXin Li 322*67e74705SXin Li print '%s: note: %d input bytes.' % (sys.argv[0], tg.num_positions) 323*67e74705SXin Li print '%s: note: %d total tests.' % (sys.argv[0], tg.num_tests) 324*67e74705SXin Li if opts.test is not None: 325*67e74705SXin Li it = [opts.test] 326*67e74705SXin Li elif opts.max_tests is not None: 327*67e74705SXin Li it = itertools.imap(random.randrange, 328*67e74705SXin Li itertools.repeat(tg.num_tests, opts.max_tests)) 329*67e74705SXin Li else: 330*67e74705SXin Li it = itertools.imap(random.randrange, itertools.repeat(tg.num_tests)) 331*67e74705SXin Li for test in it: 332*67e74705SXin Li t = tg.get_test(test) 333*67e74705SXin Li 334*67e74705SXin Li if opts.verbose: 335*67e74705SXin Li print '%s: note: running test %d: %r' % (sys.argv[0], test, t) 336*67e74705SXin Li ta = TestApplication(tg, t) 337*67e74705SXin Li try: 338*67e74705SXin Li ta.apply() 339*67e74705SXin Li test_result = run_one_test(ta, test, input_files, args) 340*67e74705SXin Li if not test_result and opts.stop_on_fail: 341*67e74705SXin Li opts.revert = False 342*67e74705SXin Li sys.exit(1) 343*67e74705SXin Li finally: 344*67e74705SXin Li if opts.revert: 345*67e74705SXin Li ta.revert() 346*67e74705SXin Li 347*67e74705SXin Li sys.stdout.flush() 348*67e74705SXin Li 349*67e74705SXin Liif __name__ == '__main__': 350*67e74705SXin Li main() 351