How using JSON Web Tokens (JWT) for Authentication works
In this section I will explain how Authorization works using JWT
In the diagram above it shows the client logging in and the server sending back an access token and a refresh token in the response. Now the client can store these two values and for future requests can send the access token in the request headers so the client can prove to the server it is an authenticated user when needed.
However access tokens should have an expiry date and time this means that the user does not use the same access token for a long period of time. This is mainly for security concerns i.e. if someone manages to get hold of your access token. Once the access token has expired the users access token will not work.
The above diagram is how we solve that problem. When the users access token is expired. The client sends a refresh token to a specified endpoint on the server. If the refresh token is valid in the response the user gets a new access token to make future requests to the server with. This means you can have an access token that expires quite frequently but the user does not get “logged out” as the user may get frustrated with logging in on a frequent basis.
What I started with
When learning about JSON Web Tokens (JWT) and FastAPI I used these two resources.
https://fastapi.tiangolo.com/tutorial/security/oauth2-jwt/
So when making this TO-DO App and deciding for Authorization I was going to use JWT I naturally used the same implementation as these resources.
However there was a problem with this, in the examples these tutorials gave it only gives you the implementation for generating and verifying an ‘access token’ in JWT and not the refresh token.
This is the code I started with for authentication based on the two tutorials above.
@router.post("/login", response_model=Token)
def login(user_credentials: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
user = db.query(models.User)\
.filter(models.User.email == user_credentials.username)\
.first()
if not user:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=f'Invalid Credentials')
if not verify(plain_password=user_credentials.password, hashed_password=user.password):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail=f'Invalid Credentials')
# create token and return it
access_token = oauth2.create_access_token(data=dict(user_id=user.id))
return {'access_token': access_token, "token_type": "bearer"}
As you can see first of all the user sends a request to the login endpoint. If the user is a valid user and the password is correct then I create an access token and return it in the response to the user. In my project I store this token in the browsers localstorage
SECRET_KEY = settings.secret_key
ALGORITHM = settings.algorithm
ACCESS_TOKEN_EXPIRE_MINUTES = settings.access_token_expire_minutes
def create_access_token(data: dict):
to_encode = data.copy()
expiry_datetime = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update(dict(exp=expiry_datetime))
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
In this function this is how the access token is created. The JWT is encoded via the jose package as per the fastapi documentation. In this example the JWT is encoded with the users database ID, the expiry date for the access token and the relevant secret key and algorithm.
When the user tries to access a protected endpoint i.e. to get data only an authenticated user can access. Then the get_current_user method is run. This is done by using this concept of dependency injection https://fastapi.tiangolo.com/tutorial/dependencies/ so that we can check that the request came from an authenticated user before our actual function is executed.
The get_current_user method basically verifies that the access token that is sent as part of the request is a valid token and returns the user data that the token was assigned to. If it is not a valid token it returns the HTTPException with a status code of 401. Here is the code for the get_current_user method below
def get_current_user(token: str = Depends(oauth_schema), db: Session = Depends(get_db)):
credentials_exception = HTTPException(status.HTTP_401_UNAUTHORIZED, detail='Could not validate credentials',
headers={"WWW-AUTHENTICATE": "BEARER"})
token = verify_access_token(token=token, credentials_exception=credentials_exception)
user = db.query(models.User).get(token.id)
return user
How I implemented refresh token functionality
First of all when the user sends a request to the login endpoint. If the users details are correct I create an additional refresh token, this is basically the same as the access token just with a longer expiry time and is encrypted with a different secret key. This then also gets returned to the user and stored in their browsers localStorage. The updated login endpoint is below.
import logging
from fastapi import APIRouter, status, Depends, HTTPException
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy import exc
from sqlalchemy.orm import Session
from app.database import get_db
from app import models
from app import oauth2
from app.schemas import AccessToken, Token, RefreshToken
from app.utils import verify_password
log = logging.getLogger(__name__)
router = APIRouter(prefix="/api/v1", tags=["Authentication"])
@router.post('/login', response_model=Token)
def login(form_data: OAuth2PasswordRequestForm = Depends(), db_session: Session = Depends(get_db)):
try:
user = db_session.query(models.User)\
.filter(models.User.email == form_data.username)\
.one()
except exc.SQLAlchemyError as e:
log.error('Problem with getting user from database')
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
if not verify_password(plain_password=form_data.password, hashed_password=user.password):
log.error('Incorrect email or password when login')
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
# create token and return it
access_token = oauth2.create_access_token(data=dict(user_id=user.id))
refresh_token = oauth2.create_refresh_token(data=dict(user_id=user.id))
return {'access_token': access_token, "token_type": "bearer", "refresh_token": refresh_token}
The code below creates a refresh token. This is called by the updated login function above.
def create_refresh_token(data: dict):
to_encode = data.copy()
expiry_datetime = datetime.utcnow() + timedelta(minutes=REFRESH_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expiry_datetime})
encoded_jwt = jwt.encode(to_encode, JWT_REFRESH_SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
I then made changes to the get_current_user dependency. I added another HTTPException this time to catch if the users access token has expired. This HTTPException then gets passed in to the verify access token function so if the user has an expired access token it will return this exception as part of the response
def get_current_user(token: str = Depends(oauth2_scheme), db_session: Session = Depends(get_db)):
credentials_exception = HTTPException(status.HTTP_401_UNAUTHORIZED, detail='Could not validate credentials',
headers={"WWW-AUTHENTICATE": "BEARER"})
expired_exception = HTTPException(status.HTTP_401_UNAUTHORIZED, detail='Expired Access Token',
headers={"WWW-AUTHENTICATE": "BEARER"})
token = verify_access_token(token=token, credentials_exception=credentials_exception,
expired_exception=expired_exception)
user = db_session.query(models.User)\
.get(token.user_id)
return user
Next here is the process that I wanted for my client side of my application with regards to refresh token functionality.
If the user sends a request to the server and the response has a status code of 401. I want the client to make a request to a “refresh” endpoint, a successful response of this endpoint being a new access token the user can use. I then needed to automatically re-fire the original request with the new access token so the user doesn’t have to make an additional manual request which may get them annoyed with the UI of my site.
First of all this is my /refresh endpoint on my server, as you can see I firstly get the refresh token provided and verify it. If it verifies successfully I create a new access token and I provide it in the response back to the client.
@router.post('/refresh', response_model=AccessToken)
def refresh(token: RefreshToken):
credentials_exception = HTTPException(status.HTTP_401_UNAUTHORIZED, detail='Could not validate credentials',
headers={"WWW-AUTHENTICATE": "BEARER"})
user_id = oauth2.verify_refresh_token(token=token.refresh_token, credentials_exception=credentials_exception)
access_token = oauth2.create_access_token(data=dict(user_id=user_id))
return {'access_token': access_token, "token_type": "bearer"}
In the client side/React code I was using axios to send HTTP requests to the server. When doing some research I found out about interceptors which allow you to intercept requests or responses before they are handled so you can implement some custom logic based on the content of the request/response.
I then found a package called axios-auth-request to handle when I get a 401 Status code because of an expired access token.
https://www.npmjs.com/package/axios-auth-refresh
How axios-auth-request works is when a 401 error is returned in the response, the axios interceptor will fire a request to my refresh endpoint to get a new access token. It will then get the original failed request update the headers to include the new access token and re-fire the request to the backend to hopefully this time get a successful response. I included this in my axiosInstance with some custom logic as specified. The implementation of this is below.
import axios, {
AxiosError,
AxiosInstance,
AxiosRequestConfig,
AxiosResponse,
} from "axios";
import createAuthRefreshInterceptor from 'axios-auth-refresh';
// Function that will be called to refresh authorization
const refreshAuthLogic = async (failedRequest) => {
const refreshToken = localStorage.getItem("userRefreshToken");
try {
const rs = await axios.post(`/api/v1/refresh`, {
refresh_token: refreshToken,
});
const { access_token } = rs.data;
localStorage.setItem("userToken", access_token);
failedRequest.response.config.headers['Authorization'] = 'Bearer ' + access_token;
return Promise.resolve();
}
finally {
}
}
function getAccessToken() {
return localStorage.getItem('userToken');
}
const onRequest = (config: AxiosRequestConfig): AxiosRequestConfig => {
config.headers["Authorization"] = `Bearer ${getAccessToken()}`;
return config;
};
const onRequestError = (error: AxiosError): Promise<AxiosError> => {
return Promise.reject(error);
};
const onResponse = (response: AxiosResponse): AxiosResponse => {
return response;
};
const onResponseError = async (error: AxiosError): Promise<AxiosResponse<any>> => {
return Promise.reject(error);
};
export const setupInterceptorsTo = (
axiosInstance: AxiosInstance
): AxiosInstance => {
createAuthRefreshInterceptor(axiosInstance, refreshAuthLogic);
axiosInstance.interceptors.request.use(onRequest, onRequestError);
axiosInstance.interceptors.response.use(onResponse, onResponseError);
return axiosInstance;
};
The code below uses the interceptors defined above to create a global axios instance. You can then import this axios instance where needed to make your HTTP requests to the server.
import axios from "axios";
import { setupInterceptorsTo } from './setUpInteceptors.ts'
const api = setupInterceptorsTo(
axios.create({
// baseURL: process.env.NEXT_PUBLIC_ENDPOINT_AUTH,
headers: {
"Content-Type": "application/json",
},
})
);
export default api;
So this is how I successfully added refresh token functionality to my TO-DO App. To view the whole code repository go to this link.
https://github.com/garethbreeze1993/to-do-app
It’s a shame you don’t have a donate button! I’d most certainly donate to this outstanding blog! I guess for now i’ll settle for book-marking and adding your RSS feed to my Google account. I look forward to fresh updates and will share this blog with my Facebook group. Talk soon!