Google SSO Integration with Django

Hi ,

Nowadays Instead of custom authentication everybody is using the SSO . Instead of maintaining the passwords and user information we are depending on the trusted companies like Google , OKTA... etc. In the same way I migrated project to remove the custom migration and use google SSO (OIDC).

There are multiple libraries present to implement Google SSO with Django framework. Ideally I have tried all of them but failed most of times due to my requirements.

Below of the libraries I have tried:

  1. django-allauth
  2. mozilla-django-oidc
  3. Authlib requests

But ended up using the requests lib . Due to user level validation and workgroup level validation

Below are steps:

  1. GET request to fetch the token
  2. Use the token to fetch the user information
  3. User group validation

urls.py

from django.contrib import admin
from django.urls import path, include
from django.contrib.auth.views import LoginView, LogoutView, PasswordChangeDoneView, \
    PasswordChangeView
from django.conf import settings
from django.conf.urls.static import static

from views.common.login_view import, login_page


urlpatterns = [
                  # Links common to all UIs
                  path('admin', admin.site.urls),
                  path("google_login/", google_login, name="login"),
                  path('', home),
] + static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT)

views.py

# imports
from google.oauth2 import service_account
from django.contrib.auth.models import User
from django.shortcuts import render, redirect
import requests
import shlex
import subprocess
import os
from django.http import HttpResponse
from django.http import HttpResponseRedirect
from django.contrib import messages
import json as simplejson
from session_decorator import  logged_in_user_only


@logged_in_user_only
def home(request, ):
    template_name = 'common/home.html'

    return render(request, template_name, {})



def google_login(request):
    if 'code' in request.GET:
        params = {
            'grant_type': 'authorization_code',
            'code': request.GET.get('code'),
            'redirect_uri': settings.REDIRECT_URI, #you will have it in api & services > credentials page
            'client_id': settings.GOOGLE_CLIENT_ID, # client_id we get from the google api & services
            'client_secret': settings.GOOGLE_SECRETE_KEY #secrete key we get from the google api & services
        }
        url = 'https://accounts.google.com/o/oauth2/token'
        response = requests.post(url, data=params)
        print("response: ", response)
        url = 'https://www.googleapis.com/oauth2/v1/userinfo'
        access_token = response.json().get('access_token')
        url = 'https://www.googleapis.com/oauth2/v1/userinfo'
        access_token = response.json().get('access_token')
        response = requests.get(url, params={'access_token': access_token})
        user_info = simplejson.loads(stdout.decode('utf-8'))
        if user_info:
            email = user_info["email"]
            is_member = user_group_validation(email)
            if is_member:
                if email.split("@")[1] == "company.com":
                    user, _ = User.objects.get_or_create(email=user_info["email"], username=user_name)
                    gender = user_info.get('gender', '').lower()
                    data = {
                        'first_name': user_info.get('name', '').split()[0],
                        'last_name': user_info.get('family_name'),
                        'google_avatar': user_info.get('picture'),
                        'gender': gender,
                        'is_active': True
                    }
                    user.__dict__.update(data)
                    user.save()
                    request.session["user"] = user_info["email"]
                else:
                    messages.warning(request, "Access Denied")
                    return redirect("/login")
            else:
                messages.warning(request, 'Access Denied. Contact Administrator')
                return redirect("/login")
        else:
            print("Not a company user")
            messages.error(
                request,
                'Unable to login with Gmail Please try again'
            )
            return redirect("/login")
        return redirect('/')
    else:
        url = "https://accounts.google.com/o/oauth2/auth?client_id=%s&response_type=code&scope=%s&redirect_uri=%s&state=google"
        scope = [
            "https://www.googleapis.com/auth/userinfo.profile",
            "https://www.googleapis.com/auth/userinfo.email"
        ]
        scope = " ".join(scope)
        url = url % (settings.GOOGLE_CLIENT_ID, scope,
                     settings.REDIRECT_URI)
        return redirect(url)



def user_group_validation(user_email: str):
    scopes = ["https://www.googleapis.com/auth/admin.directory.group.member.readonly"]
    print(f"Base dir: {settings.BASE_DIR}")
    service_acnt_file_path = settings.BASE_DIR + "/" + settings.SERVICE_ACCOUNT_FILE
    credentials = service_account.Credentials.from_service_account_file(
        service_acnt_file_path, scopes=scopes
    )
    # impersonate a user with access to the admin SDK
    # The target user must have accepted the terms of use
    credentials = credentials.with_subject(settings.ADMIN_GROUP_KEY)

    # check to see if the user is in our target group
    try:
        admin = build("admin", "directory_v1", credentials=credentials)
        allow = (
            admin.members()
                .hasMember(groupKey=settings.GROUP_KEY, memberKey=user_email)
                .execute()
        )
        if allow:
            return allow["isMember"]
    except HttpError as httperror:
        if httperror.status_code == 400:
            return False
    except Exception as e:
        print(f"Exception in user validation belongs to group :{e}")
        return False
    return True

session_decorator.py


from functools import wraps
from django.http import HttpResponseRedirect
from rest_framework import permissions


def logged_in_user_only(function):
    @wraps(function)
    def wrap(request, *args, **kwargs):

        session_user = request.session.keys()
        if len(session_user):
            print(f"session user: {session_user}")
            if request.session.get("user", None) is not None:
                return function(request, *args, **kwargs)
            else:
                print("session is not present redirecting to login page")
                return HttpResponseRedirect('/login')
        else:
            print("session is not present redirecting to login page")
            return HttpResponseRedirect('/login')

    return wrap

I have used subprocess for GET request due to it is raising the ssl error if we use request module in our enviroment