Source code for flask_jwt_router._routing

"""
    Main class for routing
"""
from abc import ABC, abstractmethod
# pylint:disable=invalid-name
import logging
from flask import request, abort, g
from werkzeug.routing import RequestRedirect
from werkzeug.exceptions import MethodNotAllowed, NotFound
from jwt.exceptions import InvalidTokenError
import jwt


from ._entity import BaseEntity
from .oauth2.google import Google
from ._config import Config

logger = logging.getLogger()


[docs]class BaseRouting(ABC): # pylint:disable=missing-class-docstring @abstractmethod def before_middleware(self) -> None: # pylint:disable=missing-function-docstring pass
[docs]class Routing(BaseRouting): """ :param app: Flask application instance :param config: :class:`~flask_jwt_router._config` :param entity: :class:`~flask_jwt_router._entity` """ def __init__(self, app, config: Config, entity: BaseEntity, google: Google = None): self.app = app self.config = config self.logger = logger self.entity = entity self.google = google def _prefix_api_name(self, w_routes=None): """ If the config has JWT_ROUTER_API_NAME defined then update each white listed route with an api name :example: "/user" -> "/api/v1/user" :param w_routes: :return List[str]: """ api_name = self.config.api_name if not api_name: return w_routes # Prepend the api name to the white listed route named_white_routes = [] for route_name in w_routes: verb, path = route_name named_white_routes.append((verb, f"{api_name}{path}")) return named_white_routes def _add_static_routes(self, path: str) -> bool: """ Always allow /static/ in path and handle static_url_path from Flask **kwargs :param path: :return: """ paths = path.split("/") defined_static = self.app.static_url_path[1:] if path == "favicon.ico" or\ paths[1] == "static" or\ paths[1] == defined_static: return True return False # pylint:disable=no-self-use def _handle_pre_flight(self, method: str) -> bool: """ Handle pre-flight requests with any verb :param method :return: {bool} """ if method == "OPTIONS": return True return False def _handle_query_params(self, white_route: str, path: str): """ Handles dynamic query params All we care about that a path segment has no url conversion. We compare it's the same as the whitelist segment & let Flask / Werkzeug handle the url matching :param white_route: :param path: :return bool: """ if "<" not in white_route: return False route_segments = white_route.split("/") path_segments = path.split("/") for r, p in zip(route_segments, path_segments): if len(r) > 0: if r[0] != "<": if r != p: return False return True def _allow_public_routes(self, white_routes): """ Create a list of tuples ie [("POST", "/users")] as public routes. Returns False if current route and verb are white listed. :param flask_request: :param white_routes: List[Tuple]: :returns bool: """ method = request.method path = request.path for white_route in white_routes: if self._handle_pre_flight(method): return False if method == white_route[0] and path == white_route[1]: return False if method == white_route[0] and self._handle_query_params(white_route[1], path): return False return True def _does_route_exist(self, url: str, method: str) -> bool: adapter = self.app.url_map.bind('') try: adapter.match(url, method=method) except RequestRedirect as e: # recursively match redirects return self._does_route_exist(e.new_url, method) except (MethodNotAllowed, NotFound): # no match return False return True
[docs] def before_middleware(self) -> None: """ Handles ignored & whitelisted & static routes with api name If it's not static, ignored whitelisted then authorize :return: Callable or None """ #pylint:disable=inconsistent-return-statements path = request.path method = request.method is_static = self._add_static_routes(path) method = request.method if not is_static: # Handle ignored routes if self._does_route_exist(path, method): is_ignored = False ignored_routes = self.config.ignored_routes if len(ignored_routes) > 0: is_ignored = not self._allow_public_routes(ignored_routes) if not is_ignored: white_routes = self._prefix_api_name(self.config.whitelist_routes) not_whitelist = self._allow_public_routes(white_routes) if not_whitelist: self._handle_token()
def _handle_token(self): """ Checks the headers contain a Bearer string OR params. Checks to see that the route is white listed. :return None: """ self.entity.clean_up() entity = None try: if request.args.get("auth"): token = request.args.get("auth") elif request.headers.get("X-Auth-Token") is not None and self.google: bearer = request.headers.get("X-Auth-Token") token = bearer.split("Bearer ")[1] try: if self.google.test_metadata: email, entity = self.google._update_test_metadata(token) else: # Currently token refreshing is not supported, so pass the current token through auth_results = self.google.authorize(token) email = auth_results["email"] self.entity.oauth_entity_key = self.config.oauth_entity if not entity: entity = self.entity.get_entity_from_token_or_tablename( tablename=self.google.tablename, email_value=email, ) setattr(g, self.entity.get_entity_from_ext().__tablename__, entity) else: setattr(g, entity.__tablename__, entity) setattr(g, "access_token", token) # Clean up google test util self.google.tear_down() return None except InvalidTokenError: return abort(401) except AttributeError: return abort(401) except TypeError: # This is raised from auth_results["email"] not present abort(401) else: # Sometimes a developer may define the auth field name as Bearer or Basic auth_header = request.headers.get("Authorization") if not auth_header: abort(401) if "Bearer " in auth_header: token = auth_header.split("Bearer ")[1] elif "Basic " in auth_header: token = auth_header.split("Basic ")[1] except AttributeError: return abort(401) try: decoded_token = jwt.decode( token, self.config.secret_key, algorithms="HS256" ) except InvalidTokenError: return abort(401) try: self.entity_key = self.config.entity_key entity = self.entity.get_entity_from_token_or_tablename(decoded_token) setattr(g, self.entity.get_entity_from_ext().__tablename__, entity) return None except ValueError: return abort(401)