calibre-web/lib/werkzeug/_internal.py

412 lines
13 KiB
Python

# -*- coding: utf-8 -*-
"""
werkzeug._internal
~~~~~~~~~~~~~~~~~~
This module provides internally used helpers and constants.
:copyright: (c) 2013 by the Werkzeug Team, see AUTHORS for more details.
:license: BSD, see LICENSE for more details.
"""
import re
import string
import inspect
from weakref import WeakKeyDictionary
from datetime import datetime, date
from itertools import chain
from werkzeug._compat import iter_bytes, text_type, BytesIO, int_to_byte, \
range_type, to_native
_logger = None
_empty_stream = BytesIO()
_signature_cache = WeakKeyDictionary()
_epoch_ord = date(1970, 1, 1).toordinal()
_cookie_params = set((b'expires', b'path', b'comment',
b'max-age', b'secure', b'httponly',
b'version'))
_legal_cookie_chars = (string.ascii_letters +
string.digits +
u"!#$%&'*+-.^_`|~:").encode('ascii')
_cookie_quoting_map = {
b',' : b'\\054',
b';' : b'\\073',
b'"' : b'\\"',
b'\\' : b'\\\\',
}
for _i in chain(range_type(32), range_type(127, 256)):
_cookie_quoting_map[int_to_byte(_i)] = ('\\%03o' % _i).encode('latin1')
_octal_re = re.compile(b'\\\\[0-3][0-7][0-7]')
_quote_re = re.compile(b'[\\\\].')
_legal_cookie_chars_re = b'[\w\d!#%&\'~_`><@,:/\$\*\+\-\.\^\|\)\(\?\}\{\=]'
_cookie_re = re.compile(b"""(?x)
(?P<key>[^=]+)
\s*=\s*
(?P<val>
"(?:[^\\\\"]|\\\\.)*" |
(?:.*?)
)
\s*;
""")
class _Missing(object):
def __repr__(self):
return 'no value'
def __reduce__(self):
return '_missing'
_missing = _Missing()
def _get_environ(obj):
env = getattr(obj, 'environ', obj)
assert isinstance(env, dict), \
'%r is not a WSGI environment (has to be a dict)' % type(obj).__name__
return env
def _log(type, message, *args, **kwargs):
"""Log into the internal werkzeug logger."""
global _logger
if _logger is None:
import logging
_logger = logging.getLogger('werkzeug')
# Only set up a default log handler if the
# end-user application didn't set anything up.
if not logging.root.handlers and _logger.level == logging.NOTSET:
_logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
_logger.addHandler(handler)
getattr(_logger, type)(message.rstrip(), *args, **kwargs)
def _parse_signature(func):
"""Return a signature object for the function."""
if hasattr(func, 'im_func'):
func = func.im_func
# if we have a cached validator for this function, return it
parse = _signature_cache.get(func)
if parse is not None:
return parse
# inspect the function signature and collect all the information
positional, vararg_var, kwarg_var, defaults = inspect.getargspec(func)
defaults = defaults or ()
arg_count = len(positional)
arguments = []
for idx, name in enumerate(positional):
if isinstance(name, list):
raise TypeError('cannot parse functions that unpack tuples '
'in the function signature')
try:
default = defaults[idx - arg_count]
except IndexError:
param = (name, False, None)
else:
param = (name, True, default)
arguments.append(param)
arguments = tuple(arguments)
def parse(args, kwargs):
new_args = []
missing = []
extra = {}
# consume as many arguments as positional as possible
for idx, (name, has_default, default) in enumerate(arguments):
try:
new_args.append(args[idx])
except IndexError:
try:
new_args.append(kwargs.pop(name))
except KeyError:
if has_default:
new_args.append(default)
else:
missing.append(name)
else:
if name in kwargs:
extra[name] = kwargs.pop(name)
# handle extra arguments
extra_positional = args[arg_count:]
if vararg_var is not None:
new_args.extend(extra_positional)
extra_positional = ()
if kwargs and not kwarg_var is not None:
extra.update(kwargs)
kwargs = {}
return new_args, kwargs, missing, extra, extra_positional, \
arguments, vararg_var, kwarg_var
_signature_cache[func] = parse
return parse
def _date_to_unix(arg):
"""Converts a timetuple, integer or datetime object into the seconds from
epoch in utc.
"""
if isinstance(arg, datetime):
arg = arg.utctimetuple()
elif isinstance(arg, (int, long, float)):
return int(arg)
year, month, day, hour, minute, second = arg[:6]
days = date(year, month, 1).toordinal() - _epoch_ord + day - 1
hours = days * 24 + hour
minutes = hours * 60 + minute
seconds = minutes * 60 + second
return seconds
class _DictAccessorProperty(object):
"""Baseclass for `environ_property` and `header_property`."""
read_only = False
def __init__(self, name, default=None, load_func=None, dump_func=None,
read_only=None, doc=None):
self.name = name
self.default = default
self.load_func = load_func
self.dump_func = dump_func
if read_only is not None:
self.read_only = read_only
self.__doc__ = doc
def __get__(self, obj, type=None):
if obj is None:
return self
storage = self.lookup(obj)
if self.name not in storage:
return self.default
rv = storage[self.name]
if self.load_func is not None:
try:
rv = self.load_func(rv)
except (ValueError, TypeError):
rv = self.default
return rv
def __set__(self, obj, value):
if self.read_only:
raise AttributeError('read only property')
if self.dump_func is not None:
value = self.dump_func(value)
self.lookup(obj)[self.name] = value
def __delete__(self, obj):
if self.read_only:
raise AttributeError('read only property')
self.lookup(obj).pop(self.name, None)
def __repr__(self):
return '<%s %s>' % (
self.__class__.__name__,
self.name
)
def _cookie_quote(b):
buf = bytearray()
all_legal = True
_lookup = _cookie_quoting_map.get
_push = buf.extend
for char in iter_bytes(b):
if char not in _legal_cookie_chars:
all_legal = False
char = _lookup(char, char)
_push(char)
if all_legal:
return bytes(buf)
return bytes(b'"' + buf + b'"')
def _cookie_unquote(b):
if len(b) < 2:
return b
if b[:1] != b'"' or b[-1:] != b'"':
return b
b = b[1:-1]
i = 0
n = len(b)
rv = bytearray()
_push = rv.extend
while 0 <= i < n:
o_match = _octal_re.search(b, i)
q_match = _quote_re.search(b, i)
if not o_match and not q_match:
rv.extend(b[i:])
break
j = k = -1
if o_match:
j = o_match.start(0)
if q_match:
k = q_match.start(0)
if q_match and (not o_match or k < j):
_push(b[i:k])
_push(b[k + 1])
i = k + 2
else:
_push(b[i:j])
rv.append(int(b[j + 1:j + 4], 8))
i = j + 4
return bytes(rv)
def _cookie_parse_impl(b):
"""Lowlevel cookie parsing facility that operates on bytes."""
i = 0
n = len(b)
while i < n:
match = _cookie_re.search(b + b';', i)
if not match:
break
key = match.group('key').strip()
value = match.group('val')
i = match.end(0)
# Ignore parameters. We have no interest in them.
if key.lower() not in _cookie_params:
yield _cookie_unquote(key), _cookie_unquote(value)
def _encode_idna(domain):
# If we're given bytes, make sure they fit into ASCII
if not isinstance(domain, text_type):
domain.decode('ascii')
return domain
# Otherwise check if it's already ascii, then return
try:
return domain.encode('ascii')
except UnicodeError:
pass
# Otherwise encode each part separately
parts = domain.split('.')
for idx, part in enumerate(parts):
parts[idx] = part.encode('idna')
return b'.'.join(parts)
def _decode_idna(domain):
# If the input is a string try to encode it to ascii to
# do the idna decoding. if that fails because of an
# unicode error, then we already have a decoded idna domain
if isinstance(domain, text_type):
try:
domain = domain.encode('ascii')
except UnicodeError:
return domain
# Decode each part separately. If a part fails, try to
# decode it with ascii and silently ignore errors. This makes
# most sense because the idna codec does not have error handling
parts = domain.split(b'.')
for idx, part in enumerate(parts):
try:
parts[idx] = part.decode('idna')
except UnicodeError:
parts[idx] = part.decode('ascii', 'ignore')
return '.'.join(parts)
def _make_cookie_domain(domain):
if domain is None:
return None
domain = _encode_idna(domain)
if b':' in domain:
domain = domain.split(b':', 1)[0]
if b'.' in domain:
return domain
raise ValueError(
'Setting \'domain\' for a cookie on a server running localy (ex: '
'localhost) is not supportted by complying browsers. You should '
'have something like: \'127.0.0.1 localhost dev.localhost\' on '
'your hosts file and then point your server to run on '
'\'dev.localhost\' and also set \'domain\' for \'dev.localhost\''
)
def _easteregg(app=None):
"""Like the name says. But who knows how it works?"""
def bzzzzzzz(gyver):
import base64
import zlib
return zlib.decompress(base64.b64decode(gyver)).decode('ascii')
gyver = u'\n'.join([x + (77 - len(x)) * u' ' for x in bzzzzzzz(b'''
eJyFlzuOJDkMRP06xRjymKgDJCDQStBYT8BCgK4gTwfQ2fcFs2a2FzvZk+hvlcRvRJD148efHt9m
9Xz94dRY5hGt1nrYcXx7us9qlcP9HHNh28rz8dZj+q4rynVFFPdlY4zH873NKCexrDM6zxxRymzz
4QIxzK4bth1PV7+uHn6WXZ5C4ka/+prFzx3zWLMHAVZb8RRUxtFXI5DTQ2n3Hi2sNI+HK43AOWSY
jmEzE4naFp58PdzhPMdslLVWHTGUVpSxImw+pS/D+JhzLfdS1j7PzUMxij+mc2U0I9zcbZ/HcZxc
q1QjvvcThMYFnp93agEx392ZdLJWXbi/Ca4Oivl4h/Y1ErEqP+lrg7Xa4qnUKu5UE9UUA4xeqLJ5
jWlPKJvR2yhRI7xFPdzPuc6adXu6ovwXwRPXXnZHxlPtkSkqWHilsOrGrvcVWXgGP3daXomCj317
8P2UOw/NnA0OOikZyFf3zZ76eN9QXNwYdD8f8/LdBRFg0BO3bB+Pe/+G8er8tDJv83XTkj7WeMBJ
v/rnAfdO51d6sFglfi8U7zbnr0u9tyJHhFZNXYfH8Iafv2Oa+DT6l8u9UYlajV/hcEgk1x8E8L/r
XJXl2SK+GJCxtnyhVKv6GFCEB1OO3f9YWAIEbwcRWv/6RPpsEzOkXURMN37J0PoCSYeBnJQd9Giu
LxYQJNlYPSo/iTQwgaihbART7Fcyem2tTSCcwNCs85MOOpJtXhXDe0E7zgZJkcxWTar/zEjdIVCk
iXy87FW6j5aGZhttDBoAZ3vnmlkx4q4mMmCdLtnHkBXFMCReqthSGkQ+MDXLLCpXwBs0t+sIhsDI
tjBB8MwqYQpLygZ56rRHHpw+OAVyGgaGRHWy2QfXez+ZQQTTBkmRXdV/A9LwH6XGZpEAZU8rs4pE
1R4FQ3Uwt8RKEtRc0/CrANUoes3EzM6WYcFyskGZ6UTHJWenBDS7h163Eo2bpzqxNE9aVgEM2CqI
GAJe9Yra4P5qKmta27VjzYdR04Vc7KHeY4vs61C0nbywFmcSXYjzBHdiEjraS7PGG2jHHTpJUMxN
Jlxr3pUuFvlBWLJGE3GcA1/1xxLcHmlO+LAXbhrXah1tD6Ze+uqFGdZa5FM+3eHcKNaEarutAQ0A
QMAZHV+ve6LxAwWnXbbSXEG2DmCX5ijeLCKj5lhVFBrMm+ryOttCAeFpUdZyQLAQkA06RLs56rzG
8MID55vqr/g64Qr/wqwlE0TVxgoiZhHrbY2h1iuuyUVg1nlkpDrQ7Vm1xIkI5XRKLedN9EjzVchu
jQhXcVkjVdgP2O99QShpdvXWoSwkp5uMwyjt3jiWCqWGSiaaPAzohjPanXVLbM3x0dNskJsaCEyz
DTKIs+7WKJD4ZcJGfMhLFBf6hlbnNkLEePF8Cx2o2kwmYF4+MzAxa6i+6xIQkswOqGO+3x9NaZX8
MrZRaFZpLeVTYI9F/djY6DDVVs340nZGmwrDqTCiiqD5luj3OzwpmQCiQhdRYowUYEA3i1WWGwL4
GCtSoO4XbIPFeKGU13XPkDf5IdimLpAvi2kVDVQbzOOa4KAXMFlpi/hV8F6IDe0Y2reg3PuNKT3i
RYhZqtkQZqSB2Qm0SGtjAw7RDwaM1roESC8HWiPxkoOy0lLTRFG39kvbLZbU9gFKFRvixDZBJmpi
Xyq3RE5lW00EJjaqwp/v3EByMSpVZYsEIJ4APaHmVtpGSieV5CALOtNUAzTBiw81GLgC0quyzf6c
NlWknzJeCsJ5fup2R4d8CYGN77mu5vnO1UqbfElZ9E6cR6zbHjgsr9ly18fXjZoPeDjPuzlWbFwS
pdvPkhntFvkc13qb9094LL5NrA3NIq3r9eNnop9DizWOqCEbyRBFJTHn6Tt3CG1o8a4HevYh0XiJ
sR0AVVHuGuMOIfbuQ/OKBkGRC6NJ4u7sbPX8bG/n5sNIOQ6/Y/BX3IwRlTSabtZpYLB85lYtkkgm
p1qXK3Du2mnr5INXmT/78KI12n11EFBkJHHp0wJyLe9MvPNUGYsf+170maayRoy2lURGHAIapSpQ
krEDuNoJCHNlZYhKpvw4mspVWxqo415n8cD62N9+EfHrAvqQnINStetek7RY2Urv8nxsnGaZfRr/
nhXbJ6m/yl1LzYqscDZA9QHLNbdaSTTr+kFg3bC0iYbX/eQy0Bv3h4B50/SGYzKAXkCeOLI3bcAt
mj2Z/FM1vQWgDynsRwNvrWnJHlespkrp8+vO1jNaibm+PhqXPPv30YwDZ6jApe3wUjFQobghvW9p
7f2zLkGNv8b191cD/3vs9Q833z8t''').splitlines()])
def easteregged(environ, start_response):
def injecting_start_response(status, headers, exc_info=None):
headers.append(('X-Powered-By', 'Werkzeug'))
return start_response(status, headers, exc_info)
if app is not None and environ.get('QUERY_STRING') != 'macgybarchakku':
return app(environ, injecting_start_response)
injecting_start_response('200 OK', [('Content-Type', 'text/html')])
return [(u'''
<!DOCTYPE html>
<html>
<head>
<title>About Werkzeug</title>
<style type="text/css">
body { font: 15px Georgia, serif; text-align: center; }
a { color: #333; text-decoration: none; }
h1 { font-size: 30px; margin: 20px 0 10px 0; }
p { margin: 0 0 30px 0; }
pre { font: 11px 'Consolas', 'Monaco', monospace; line-height: 0.95; }
</style>
</head>
<body>
<h1><a href="http://werkzeug.pocoo.org/">Werkzeug</a></h1>
<p>the Swiss Army knife of Python web development.</p>
<pre>%s\n\n\n</pre>
</body>
</html>''' % gyver).encode('latin1')]
return easteregged