Package cherrypy :: Package test :: Module test_tools
[hide private]
[frames] | no frames]

Source Code for Module cherrypy.test.test_tools

  1  """Test the various means of instantiating and invoking tools.""" 
  2   
  3  import gzip, StringIO 
  4  import time 
  5  timeout = 0.2 
  6   
  7  import types 
  8  from cherrypy.test import test 
  9  test.prefer_parent_path() 
 10   
 11  import cherrypy 
 12  from cherrypy import tools 
 13   
 14   
 15  europoundUnicode = u'\x80\xa3' 
 16   
17 -def setup_server():
18 19 # Put check_access in a custom toolbox with its own namespace 20 myauthtools = cherrypy._cptools.Toolbox("myauth") 21 22 def check_access(default=False): 23 if not getattr(cherrypy.request, "userid", default): 24 raise cherrypy.HTTPError(401)
25 myauthtools.check_access = cherrypy.Tool('before_request_body', check_access) 26 27 def numerify(): 28 def number_it(body): 29 for chunk in body: 30 for k, v in cherrypy.request.numerify_map: 31 chunk = chunk.replace(k, v) 32 yield chunk 33 cherrypy.response.body = number_it(cherrypy.response.body) 34 35 class NumTool(cherrypy.Tool): 36 def _setup(self): 37 def makemap(): 38 m = self._merged_args().get("map", {}) 39 cherrypy.request.numerify_map = m.items() 40 cherrypy.request.hooks.attach('on_start_resource', makemap) 41 42 def critical(): 43 cherrypy.request.error_response = cherrypy.HTTPError(502).set_response 44 critical.failsafe = True 45 46 cherrypy.request.hooks.attach('on_start_resource', critical) 47 cherrypy.request.hooks.attach(self._point, self.callable) 48 49 tools.numerify = NumTool('before_finalize', numerify) 50 51 # It's not mandatory to inherit from cherrypy.Tool. 52 class NadsatTool: 53 54 def __init__(self): 55 self.ended = {} 56 self._name = "nadsat" 57 58 def nadsat(self): 59 def nadsat_it_up(body): 60 for chunk in body: 61 chunk = chunk.replace("good", "horrorshow") 62 chunk = chunk.replace("piece", "lomtick") 63 yield chunk 64 cherrypy.response.body = nadsat_it_up(cherrypy.response.body) 65 nadsat.priority = 0 66 67 def cleanup(self): 68 # This runs after the request has been completely written out. 69 cherrypy.response.body = "razdrez" 70 id = cherrypy.request.params.get("id") 71 if id: 72 self.ended[id] = True 73 cleanup.failsafe = True 74 75 def _setup(self): 76 cherrypy.request.hooks.attach('before_finalize', self.nadsat) 77 cherrypy.request.hooks.attach('on_end_request', self.cleanup) 78 tools.nadsat = NadsatTool() 79 80 def pipe_body(): 81 cherrypy.request.process_request_body = False 82 clen = int(cherrypy.request.headers['Content-Length']) 83 cherrypy.request.body = cherrypy.request.rfile.read(clen) 84 85 # Assert that we can use a callable object instead of a function. 86 class Rotator(object): 87 def __call__(self, scale): 88 r = cherrypy.response 89 r.collapse_body() 90 r.body = [chr(ord(x) + scale) for x in r.body] 91 cherrypy.tools.rotator = cherrypy.Tool('before_finalize', Rotator()) 92 93 class Root: 94 def index(self): 95 return "Howdy earth!" 96 index.exposed = True 97 98 def euro(self): 99 hooks = list(cherrypy.request.hooks['before_finalize']) 100 hooks.sort() 101 assert [x.callback.__name__ for x in hooks] == ['encode', 'gzip'] 102 assert [x.priority for x in hooks] == [70, 80] 103 yield u"Hello," 104 yield u"world" 105 yield europoundUnicode 106 euro.exposed = True 107 108 # Bare hooks 109 def pipe(self): 110 return cherrypy.request.body 111 pipe.exposed = True 112 pipe._cp_config = {'hooks.before_request_body': pipe_body} 113 114 # Multiple decorators; include kwargs just for fun. 115 # Note that encode must run before gzip. 116 def decorated_euro(self, *vpath): 117 yield u"Hello," 118 yield u"world" 119 yield europoundUnicode 120 decorated_euro.exposed = True 121 decorated_euro = tools.gzip(compress_level=6)(decorated_euro) 122 decorated_euro = tools.encode(errors='ignore')(decorated_euro) 123 124 root = Root() 125 126 127 class TestType(type): 128 """Metaclass which automatically exposes all functions in each subclass, 129 and adds an instance of the subclass as an attribute of root. 130 """ 131 def __init__(cls, name, bases, dct): 132 type.__init__(name, bases, dct) 133 for value in dct.itervalues(): 134 if isinstance(value, types.FunctionType): 135 value.exposed = True 136 setattr(root, name.lower(), cls()) 137 class Test(object): 138 __metaclass__ = TestType 139 140 141 # METHOD ONE: 142 # Declare Tools in _cp_config 143 class Demo(Test): 144 145 _cp_config = {"tools.nadsat.on": True} 146 147 def index(self, id=None): 148 return "A good piece of cherry pie" 149 150 def ended(self, id): 151 return repr(tools.nadsat.ended[id]) 152 153 def err(self, id=None): 154 raise ValueError() 155 156 def errinstream(self, id=None): 157 raise ValueError() 158 yield "confidential" 159 160 # METHOD TWO: decorator using Tool() 161 # We support Python 2.3, but the @-deco syntax would look like this: 162 # @tools.check_access() 163 def restricted(self): 164 return "Welcome!" 165 restricted = myauthtools.check_access()(restricted) 166 userid = restricted 167 168 def err_in_onstart(self): 169 return "success!" 170 171 def stream(self, id=None): 172 for x in xrange(100000000): 173 yield str(x) 174 stream._cp_config = {'response.stream': True} 175 176 177 cherrypy.config.update({'environment': 'test_suite'}) 178 179 conf = { 180 # METHOD THREE: 181 # Declare Tools in detached config 182 '/demo': { 183 'tools.numerify.on': True, 184 'tools.numerify.map': {"pie": "3.14159"}, 185 }, 186 '/demo/restricted': { 187 'request.show_tracebacks': False, 188 }, 189 '/demo/userid': { 190 'request.show_tracebacks': False, 191 'myauth.check_access.default': True, 192 }, 193 '/demo/errinstream': { 194 'response.stream': True, 195 }, 196 '/demo/err_in_onstart': { 197 # Because this isn't a dict, on_start_resource will error. 198 'tools.numerify.map': "pie->3.14159" 199 }, 200 # Combined tools 201 '/euro': { 202 'tools.gzip.on': True, 203 'tools.encode.on': True, 204 }, 205 # Priority specified in config 206 '/decorated_euro/subpath': { 207 'tools.gzip.priority': 10, 208 }, 209 } 210 cherrypy.tree.mount(root, config=conf) 211 212 213 # Client-side code # 214 215 from cherrypy.test import helper 216 217
218 -class ToolTests(helper.CPWebCase):
219
220 - def testHookErrors(self):
221 self.getPage("/demo/?id=1") 222 # If body is "razdrez", then on_end_request is being called too early. 223 self.assertBody("A horrorshow lomtick of cherry 3.14159") 224 # If this fails, then on_end_request isn't being called at all. 225 time.sleep(0.1) 226 self.getPage("/demo/ended/1") 227 self.assertBody("True") 228 229 valerr = '\n raise ValueError()\nValueError' 230 self.getPage("/demo/err?id=3") 231 # If body is "razdrez", then on_end_request is being called too early. 232 self.assertErrorPage(502, pattern=valerr) 233 # If this fails, then on_end_request isn't being called at all. 234 time.sleep(0.1) 235 self.getPage("/demo/ended/3") 236 self.assertBody("True") 237 238 # If body is "razdrez", then on_end_request is being called too early. 239 self.getPage("/demo/errinstream?id=5") 240 # Because this error is raised after the response body has 241 # started, the status should not change to an error status. 242 self.assertStatus("200 OK") 243 self.assertBody("Unrecoverable error in the server.") 244 # If this fails, then on_end_request isn't being called at all. 245 time.sleep(0.1) 246 self.getPage("/demo/ended/5") 247 self.assertBody("True") 248 249 # Test the "__call__" technique (compile-time decorator). 250 self.getPage("/demo/restricted") 251 self.assertErrorPage(401) 252 253 # Test compile-time decorator with kwargs from config. 254 self.getPage("/demo/userid") 255 self.assertBody("Welcome!")
256
257 - def testEndRequestOnDrop(self):
258 old_timeout = None 259 try: 260 httpserver = cherrypy.server.httpservers.keys()[0] 261 old_timeout = httpserver.timeout 262 except (AttributeError, IndexError): 263 print "skipped ", 264 return 265 266 try: 267 httpserver.timeout = timeout 268 269 # Test that on_end_request is called even if the client drops. 270 self.persistent = True 271 try: 272 conn = self.HTTP_CONN 273 conn.putrequest("GET", "/demo/stream?id=9", skip_host=True) 274 conn.putheader("Host", self.HOST) 275 conn.endheaders() 276 # Skip the rest of the request and close the conn. This will 277 # cause the server's active socket to error, which *should* 278 # result in the request being aborted, and request.close being 279 # called all the way up the stack (including WSGI middleware), 280 # eventually calling our on_end_request hook. 281 finally: 282 self.persistent = False 283 time.sleep(timeout * 2) 284 # Test that the on_end_request hook was called. 285 self.getPage("/demo/ended/9") 286 self.assertBody("True") 287 finally: 288 if old_timeout is not None: 289 httpserver.timeout = old_timeout
290
291 - def testGuaranteedHooks(self):
292 # The 'critical' on_start_resource hook is 'failsafe' (guaranteed 293 # to run even if there are failures in other on_start methods). 294 # This is NOT true of the other hooks. 295 # Here, we have set up a failure in NumerifyTool.numerify_map, 296 # but our 'critical' hook should run and set the error to 502. 297 self.getPage("/demo/err_in_onstart") 298 self.assertErrorPage(502) 299 self.assertInBody("AttributeError: 'str' object has no attribute 'items'")
300
301 - def testCombinedTools(self):
302 expectedResult = (u"Hello,world" + europoundUnicode).encode('utf-8') 303 zbuf = StringIO.StringIO() 304 zfile = gzip.GzipFile(mode='wb', fileobj=zbuf, compresslevel=9) 305 zfile.write(expectedResult) 306 zfile.close() 307 308 self.getPage("/euro", headers=[("Accept-Encoding", "gzip"), 309 ("Accept-Charset", "ISO-8859-1,utf-8;q=0.7,*;q=0.7")]) 310 self.assertInBody(zbuf.getvalue()[:3]) 311 312 zbuf = StringIO.StringIO() 313 zfile = gzip.GzipFile(mode='wb', fileobj=zbuf, compresslevel=6) 314 zfile.write(expectedResult) 315 zfile.close() 316 317 self.getPage("/decorated_euro", headers=[("Accept-Encoding", "gzip")]) 318 self.assertInBody(zbuf.getvalue()[:3]) 319 320 # This should break because gzip's priority was lowered in conf. 321 # Of course, we don't want breakage in production apps, 322 # but it proves the priority was changed. 323 self.getPage("/decorated_euro/subpath", 324 headers=[("Accept-Encoding", "gzip")]) 325 self.assertErrorPage(500, pattern='UnicodeEncodeError')
326
327 - def testBareHooks(self):
328 content = "bit of a pain in me gulliver" 329 self.getPage("/pipe", 330 headers=[("Content-Length", len(content)), 331 ("Content-Type", "text/plain")], 332 method="POST", body=content) 333 self.assertBody(content)
334 335 336 if __name__ == '__main__': 337 setup_server() 338 helper.testmain() 339