1 """Extensions to unittest for web frameworks.
2
3 Use the WebCase.getPage method to request a page from your HTTP server.
4
5 Framework Integration
6 =====================
7
8 If you have control over your server process, you can handle errors
9 in the server-side of the HTTP conversation a bit better. You must run
10 both the client (your WebCase tests) and the server in the same process
11 (but in separate threads, obviously).
12
13 When an error occurs in the framework, call server_error. It will print
14 the traceback to stdout, and keep any assertions you have from running
15 (the assumption is that, if the server errors, the page output will not
16 be of further significance to your tests).
17 """
18
19 import os, sys, time, re
20 import types
21 import pprint
22 import socket
23 import httplib
24 import traceback
25
26 from unittest import *
27 from unittest import _TextTestResult
28
29
31
33
34 if self.errors or self.failures:
35 if self.dots or self.showAll:
36 self.stream.writeln()
37 self.printErrorList('ERROR', self.errors)
38 self.printErrorList('FAIL', self.failures)
39
40
42 """A test runner class that displays results in textual form."""
43
46
47 - def run(self, test):
48 "Run the given test case or test suite."
49
50 result = self._makeResult()
51 startTime = time.time()
52 test(result)
53 timeTaken = float(time.time() - startTime)
54 result.printErrors()
55 if not result.wasSuccessful():
56 self.stream.write("FAILED (")
57 failed, errored = map(len, (result.failures, result.errors))
58 if failed:
59 self.stream.write("failures=%d" % failed)
60 if errored:
61 if failed: self.stream.write(", ")
62 self.stream.write("errors=%d" % errored)
63 self.stream.writeln(")")
64 return result
65
66
68
70 """Return a suite of all tests cases given a string specifier.
71
72 The name may resolve either to a module, a test case class, a
73 test method within a test case class, or a callable object which
74 returns a TestCase or TestSuite instance.
75
76 The method optionally resolves the names relative to a given module.
77 """
78 parts = name.split('.')
79 if module is None:
80 if not parts:
81 raise ValueError("incomplete test name: %s" % name)
82 else:
83 parts_copy = parts[:]
84 while parts_copy:
85 target = ".".join(parts_copy)
86 if target in sys.modules:
87 module = reload(sys.modules[target])
88 break
89 else:
90 try:
91 module = __import__(target)
92 break
93 except ImportError:
94 del parts_copy[-1]
95 if not parts_copy:
96 raise
97 parts = parts[1:]
98 obj = module
99 for part in parts:
100 obj = getattr(obj, part)
101
102 if type(obj) == types.ModuleType:
103 return self.loadTestsFromModule(obj)
104 elif (isinstance(obj, (type, types.ClassType)) and
105 issubclass(obj, TestCase)):
106 return self.loadTestsFromTestCase(obj)
107 elif type(obj) == types.UnboundMethodType:
108 return obj.im_class(obj.__name__)
109 elif callable(obj):
110 test = obj()
111 if not isinstance(test, TestCase) and \
112 not isinstance(test, TestSuite):
113 raise ValueError("calling %s returned %s, "
114 "not a test" % (obj,test))
115 return test
116 else:
117 raise ValueError("do not know how to make test from: %s" % obj)
118
119
120 try:
121
122 import msvcrt
124 return msvcrt.getch()
125 except ImportError:
126
127 import tty, termios
129 fd = sys.stdin.fileno()
130 old_settings = termios.tcgetattr(fd)
131 try:
132 tty.setraw(sys.stdin.fileno())
133 ch = sys.stdin.read(1)
134 finally:
135 termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
136 return ch
137
138
140 HOST = "127.0.0.1"
141 PORT = 8000
142 HTTP_CONN = httplib.HTTPConnection
143 PROTOCOL = "HTTP/1.1"
144
146 """Make our HTTP_CONN persistent (or not).
147
148 If the 'on' argument is True (the default), then self.HTTP_CONN
149 will be set to an instance of httplib.HTTPConnection (or HTTPS
150 if self.scheme is "https"). This will then persist across requests.
151
152 We only allow for a single open connection, so if you call this
153 and we currently have an open connection, it will be closed.
154 """
155 try:
156 self.HTTP_CONN.close()
157 except (TypeError, AttributeError):
158 pass
159
160 if self.scheme == "https":
161 cls = httplib.HTTPSConnection
162 else:
163 cls = httplib.HTTPConnection
164
165 if on:
166 host = self.HOST
167 if not host:
168
169
170 host = "127.0.0.1"
171 self.HTTP_CONN = cls(host, self.PORT)
172
173 self.HTTP_CONN.auto_open = auto_open
174 self.HTTP_CONN.connect()
175 else:
176 self.HTTP_CONN = cls
177
179 return hasattr(self.HTTP_CONN, "__class__")
182 persistent = property(_get_persistent, _set_persistent)
183
184 - def getPage(self, url, headers=None, method="GET", body=None, protocol=None):
185 """Open the url with debugging support. Return status, headers, body."""
186 ServerError.on = False
187
188 self.url = url
189 host = self.HOST
190 if not host:
191
192
193 host = "127.0.0.1"
194 result = openURL(url, headers, method, body, host, self.PORT,
195 self.HTTP_CONN, protocol or self.PROTOCOL)
196 self.status, self.headers, self.body = result
197
198
199 self.cookies = [('Cookie', v) for k, v in self.headers
200 if k.lower() == 'set-cookie']
201
202 if ServerError.on:
203 raise ServerError()
204 return result
205
206 interactive = True
207 console_height = 30
208
210 print
211 print " ERROR:", msg
212
213 if not self.interactive:
214 raise self.failureException(msg)
215
216 p = " Show: [B]ody [H]eaders [S]tatus [U]RL; [I]gnore, [R]aise, or sys.e[X]it >> "
217 print p,
218 while True:
219 i = getchar().upper()
220 if i not in "BHSUIRX":
221 continue
222 print i.upper()
223 if i == "B":
224 for x, line in enumerate(self.body.splitlines()):
225 if (x + 1) % self.console_height == 0:
226
227 print "<-- More -->\r",
228 m = getchar().lower()
229
230 print " \r",
231 if m == "q":
232 break
233 print line
234 elif i == "H":
235 pprint.pprint(self.headers)
236 elif i == "S":
237 print self.status
238 elif i == "U":
239 print self.url
240 elif i == "I":
241
242 return
243 elif i == "R":
244 raise self.failureException(msg)
245 elif i == "X":
246 self.exit()
247 print p,
248
251
252 if sys.version_info >= (2, 5):
254 if result is None:
255 result = self.defaultTestResult()
256 result.startTest(self)
257 testMethod = getattr(self, self._testMethodName)
258 try:
259 try:
260 self.setUp()
261 except (KeyboardInterrupt, SystemExit):
262 raise
263 except:
264 result.addError(self, self._exc_info())
265 return
266
267 ok = 0
268 try:
269 testMethod()
270 ok = 1
271 except self.failureException:
272 result.addFailure(self, self._exc_info())
273 except (KeyboardInterrupt, SystemExit):
274 raise
275 except:
276 result.addError(self, self._exc_info())
277
278 try:
279 self.tearDown()
280 except (KeyboardInterrupt, SystemExit):
281 raise
282 except:
283 result.addError(self, self._exc_info())
284 ok = 0
285 if ok:
286 result.addSuccess(self)
287 finally:
288 result.stopTest(self)
289 else:
291 if result is None:
292 result = self.defaultTestResult()
293 result.startTest(self)
294 testMethod = getattr(self, self._TestCase__testMethodName)
295 try:
296 try:
297 self.setUp()
298 except (KeyboardInterrupt, SystemExit):
299 raise
300 except:
301 result.addError(self, self._TestCase__exc_info())
302 return
303
304 ok = 0
305 try:
306 testMethod()
307 ok = 1
308 except self.failureException:
309 result.addFailure(self, self._TestCase__exc_info())
310 except (KeyboardInterrupt, SystemExit):
311 raise
312 except:
313 result.addError(self, self._TestCase__exc_info())
314
315 try:
316 self.tearDown()
317 except (KeyboardInterrupt, SystemExit):
318 raise
319 except:
320 result.addError(self, self._TestCase__exc_info())
321 ok = 0
322 if ok:
323 result.addSuccess(self)
324 finally:
325 result.stopTest(self)
326
328 """Fail if self.status != status."""
329 if isinstance(status, basestring):
330 if not self.status == status:
331 if msg is None:
332 msg = 'Status (%s) != %s' % (`self.status`, `status`)
333 self._handlewebError(msg)
334 elif isinstance(status, int):
335 code = int(self.status[:3])
336 if code != status:
337 if msg is None:
338 msg = 'Status (%s) != %s' % (`self.status`, `status`)
339 self._handlewebError(msg)
340 else:
341
342 match = False
343 for s in status:
344 if isinstance(s, basestring):
345 if self.status == s:
346 match = True
347 break
348 elif int(self.status[:3]) == s:
349 match = True
350 break
351 if not match:
352 if msg is None:
353 msg = 'Status (%s) not in %s' % (`self.status`, `status`)
354 self._handlewebError(msg)
355
357 """Fail if (key, [value]) not in self.headers."""
358 lowkey = key.lower()
359 for k, v in self.headers:
360 if k.lower() == lowkey:
361 if value is None or str(value) == v:
362 return v
363
364 if msg is None:
365 if value is None:
366 msg = '%s not in headers' % `key`
367 else:
368 msg = '%s:%s not in headers' % (`key`, `value`)
369 self._handlewebError(msg)
370
372 """Fail if key in self.headers."""
373 lowkey = key.lower()
374 matches = [k for k, v in self.headers if k.lower() == lowkey]
375 if matches:
376 if msg is None:
377 msg = '%s in headers' % `key`
378 self._handlewebError(msg)
379
380 - def assertBody(self, value, msg=None):
381 """Fail if value != self.body."""
382 if value != self.body:
383 if msg is None:
384 msg = 'expected body:\n%s\n\nactual body:\n%s' % (`value`, `self.body`)
385 self._handlewebError(msg)
386
387 - def assertInBody(self, value, msg=None):
388 """Fail if value not in self.body."""
389 if value not in self.body:
390 if msg is None:
391 msg = '%s not in body' % `value`
392 self._handlewebError(msg)
393
394 - def assertNotInBody(self, value, msg=None):
395 """Fail if value in self.body."""
396 if value in self.body:
397 if msg is None:
398 msg = '%s found in body' % `value`
399 self._handlewebError(msg)
400
401 - def assertMatchesBody(self, pattern, msg=None, flags=0):
402 """Fail if value (a regex pattern) is not in self.body."""
403 if re.search(pattern, self.body, flags) is None:
404 if msg is None:
405 msg = 'No match for %s in body' % `pattern`
406 self._handlewebError(msg)
407
408
409 methods_with_bodies = ("POST", "PUT")
410
412 """Return request headers, with required headers added (if missing)."""
413 if headers is None:
414 headers = []
415
416
417
418 found = False
419 for k, v in headers:
420 if k.lower() == 'host':
421 found = True
422 break
423 if not found:
424 if port == 80:
425 headers.append(("Host", host))
426 else:
427 headers.append(("Host", "%s:%s" % (host, port)))
428
429 if method in methods_with_bodies:
430
431 found = False
432 for k, v in headers:
433 if k.lower() == 'content-type':
434 found = True
435 break
436 if not found:
437 headers.append(("Content-Type", "application/x-www-form-urlencoded"))
438 headers.append(("Content-Length", str(len(body or ""))))
439
440 return headers
441
442
444 """Return status, headers, body the way we like from a response."""
445 h = []
446 key, value = None, None
447 for line in response.msg.headers:
448 if line:
449 if line[0] in " \t":
450 value += line.strip()
451 else:
452 if key and value:
453 h.append((key, value))
454 key, value = line.split(":", 1)
455 key = key.strip()
456 value = value.strip()
457 if key and value:
458 h.append((key, value))
459
460 return "%s %s" % (response.status, response.reason), h, response.read()
461
462
463 -def openURL(url, headers=None, method="GET", body=None,
464 host="127.0.0.1", port=8000, http_conn=httplib.HTTPConnection,
465 protocol="HTTP/1.1"):
466 """Open the given HTTP resource and return status, headers, and body."""
467
468 headers = cleanHeaders(headers, method, body, host, port)
469
470
471
472 for trial in xrange(10):
473 try:
474
475 if hasattr(http_conn, "host"):
476 conn = http_conn
477 else:
478 conn = http_conn(host, port)
479
480 conn._http_vsn_str = protocol
481 conn._http_vsn = int("".join([x for x in protocol if x.isdigit()]))
482
483
484 if sys.version_info < (2, 4):
485 def putheader(self, header, value):
486 if header == 'Accept-Encoding' and value == 'identity':
487 return
488 self.__class__.putheader(self, header, value)
489 import new
490 conn.putheader = new.instancemethod(putheader, conn, conn.__class__)
491 conn.putrequest(method.upper(), url, skip_host=True)
492 else:
493 conn.putrequest(method.upper(), url, skip_host=True,
494 skip_accept_encoding=True)
495
496 for key, value in headers:
497 conn.putheader(key, value)
498 conn.endheaders()
499
500 if body is not None:
501 conn.send(body)
502
503
504 response = conn.getresponse()
505
506 s, h, b = shb(response)
507
508 if not hasattr(http_conn, "host"):
509
510 conn.close()
511
512 return s, h, b
513 except socket.error:
514 time.sleep(0.5)
515 raise
516
517
518
519
520 ignored_exceptions = []
521
522
523
524
525 ignore_all = False
526
529
530
532 """Server debug hook. Return True if exception handled, False if ignored.
533
534 You probably want to wrap this, so you can still handle an error using
535 your framework when it's ignored.
536 """
537 if exc is None:
538 exc = sys.exc_info()
539
540 if ignore_all or exc[0] in ignored_exceptions:
541 return False
542 else:
543 ServerError.on = True
544 print
545 print "".join(traceback.format_exception(*exc))
546 return True
547