Update code formating

This commit is contained in:
Paul Gonçalves Monnet 2024-06-07 11:40:36 +02:00
parent 9b8588f7eb
commit 44acb1a0be
4 changed files with 94 additions and 107 deletions

View File

@ -1,15 +1,6 @@
from threading import Thread
from flask import (
Flask,
render_template,
send_file,
Response,
request,
redirect,
jsonify,
)
import io import io
from ldapSync import sync_ldap_to_database from threading import Thread
from database import ( from database import (
add_door_to_database, add_door_to_database,
check_access, check_access,
@ -22,6 +13,15 @@ from database import (
log_access_attempt, log_access_attempt,
) )
from env import DBFILE, WebServerPORT from env import DBFILE, WebServerPORT
from flask import (
Flask,
Response,
jsonify,
redirect,
render_template,
request,
)
from ldapSync import sync_ldap_to_database
app = Flask(__name__) app = Flask(__name__)
@ -92,11 +92,9 @@ def add_door():
group_cn = request.form["group_cn"] group_cn = request.form["group_cn"]
# Update with your database file path # Update with your database file path
exec = add_door_to_database(DBFILE, group_cn, Door_id)
if add_door_to_database(DBFILE, group_cn, Door_id): if add_door_to_database(DBFILE, group_cn, Door_id):
return redirect("/") return redirect("/")
else: return "Failed to add door to the database."
return f"Failed to add door to the database."
# Route to handle sync button click # Route to handle sync button click
@ -121,14 +119,12 @@ def door_access():
log_access_attempt(DBFILE, upn, rfid_uid, True, door_id) log_access_attempt(DBFILE, upn, rfid_uid, True, door_id)
return jsonify({"access_granted": True, "upn": upn}), 200 return jsonify({"access_granted": True, "upn": upn}), 200
else: log_access_attempt(DBFILE, upn, rfid_uid, False, door_id)
log_access_attempt(DBFILE, upn, rfid_uid, False, door_id) return jsonify({"access_granted": False}), 403
return jsonify({"access_granted": False}), 403
def run_flask_app(): def run_flask_app():
""" """Run the Flask web application.
Run the Flask web application.
This function starts the Flask web application with debugging enabled, This function starts the Flask web application with debugging enabled,
no reloader, on the specified port and host. It serves as the main entry no reloader, on the specified port and host. It serves as the main entry
@ -138,13 +134,12 @@ def run_flask_app():
def run_webServer_thread(): def run_webServer_thread():
""" """Start the Flask web server in a separate thread.
Start the Flask web server in a separate thread.
This function initializes and starts a new thread to run the Flask web This function initializes and starts a new thread to run the Flask web
application. It allows the web server to run concurrently with other application. It allows the web server to run concurrently with other
tasks in the main program, ensuring the web interface remains responsive. tasks in the main program, ensuring the web interface remains responsive.
""" """
print(f"STARTING WEB SERVER ON PORT {WebServerPORT}") print(f"STARTING WEB SERVER ON PORT {WebServerPORT}")
flask_thread = Thread(target=run_flask_app, daemon=True) flask_thread = Thread(target=run_flask_app, daemon=True)
flask_thread.start() flask_thread.start()

View File

@ -1,12 +1,12 @@
from datetime import datetime
import sqlite3 import sqlite3
from datetime import datetime
from env import DBFILE from env import DBFILE
# Function to check if a table exists in the database # Function to check if a table exists in the database
def table_exists(cursor, table_name): def table_exists(cursor, table_name):
""" """Check if a table exists in the database.
Check if a table exists in the database.
This function checks whether a table with the specified name exists in the database. This function checks whether a table with the specified name exists in the database.
@ -18,15 +18,15 @@ def table_exists(cursor, table_name):
- bool: True if the table exists, False otherwise. - bool: True if the table exists, False otherwise.
""" """
cursor.execute( cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,) "SELECT name FROM sqlite_master WHERE type='table' AND name=?",
(table_name,),
) )
return cursor.fetchone() is not None return cursor.fetchone() is not None
# Function to create the Users table # Function to create the Users table
def create_users_table(cursor): def create_users_table(cursor):
""" """Create the Users table in the database.
Create the Users table in the database.
This function creates the Users table with columns for user principal name (upn), RFID UID, and member of groups. This function creates the Users table with columns for user principal name (upn), RFID UID, and member of groups.
@ -43,8 +43,7 @@ def create_users_table(cursor):
# Function to create the Groups table # Function to create the Groups table
def create_groups_table(cursor): def create_groups_table(cursor):
""" """Create the Groups table in the database.
Create the Groups table in the database.
This function creates the Groups table with a single column for common name (cn) of the group. This function creates the Groups table with a single column for common name (cn) of the group.
@ -58,8 +57,7 @@ def create_groups_table(cursor):
# Function to create the Doors table # Function to create the Doors table
def create_doors_table(cursor): def create_doors_table(cursor):
""" """Create the Doors table in the database.
Create the Doors table in the database.
This function creates the Doors table with columns for door ID and associated group common name. This function creates the Doors table with columns for door ID and associated group common name.
@ -75,8 +73,7 @@ def create_doors_table(cursor):
# Function to create the logs table # Function to create the logs table
def create_logs_table(cursor): def create_logs_table(cursor):
""" """Create the logs table in the database.
Create the logs table in the database.
This function creates the logs table with columns for ID (auto-incremented), timestamp, user, RFID UID, door ID, This function creates the logs table with columns for ID (auto-incremented), timestamp, user, RFID UID, door ID,
and access granted status. Foreign key constraints are set on the door ID, user, and RFID UID columns. and access granted status. Foreign key constraints are set on the door ID, user, and RFID UID columns.
@ -101,8 +98,7 @@ def create_logs_table(cursor):
# Function to setup the database # Function to setup the database
def setup_database(db_file): def setup_database(db_file):
""" """Set up the SQLite database by creating necessary tables if they don't already exist.
Set up the SQLite database by creating necessary tables if they don't already exist.
This function checks if the Users, Groups, Doors, and Log tables exist in the database. If any of them don't exist, This function checks if the Users, Groups, Doors, and Log tables exist in the database. If any of them don't exist,
it creates them using their respective creation functions. After creating or verifying the tables, it commits it creates them using their respective creation functions. After creating or verifying the tables, it commits
@ -150,8 +146,7 @@ def setup_database(db_file):
def log_access_attempt(db_file, user, rFIDUID, granted, doorID): def log_access_attempt(db_file, user, rFIDUID, granted, doorID):
""" """Log an access attempt to the database.
Log an access attempt to the database.
This function inserts a new entry into the log table of the SQLite database, recording details about the access attempt, This function inserts a new entry into the log table of the SQLite database, recording details about the access attempt,
such as the timestamp, user, RFID UID, whether access was granted, and the door ID. such as the timestamp, user, RFID UID, whether access was granted, and the door ID.
@ -182,8 +177,7 @@ def log_access_attempt(db_file, user, rFIDUID, granted, doorID):
def print_users_table(cursor): def print_users_table(cursor):
""" """Print the content of the Users table.
Print the content of the Users table.
## Parameters: ## Parameters:
- cursor (sqlite3.Cursor): Cursor object for executing SQLite queries. - cursor (sqlite3.Cursor): Cursor object for executing SQLite queries.
@ -197,12 +191,11 @@ def print_users_table(cursor):
# Function to print the content of the Groups table # Function to print the content of the Groups table
def print_groups_table(cursor): def print_groups_table(cursor):
""" """Print the content of the Groups table.
Print the content of the Groups table.
## Parameters: ## Parameters:
- cursor (sqlite3.Cursor): Cursor object for executing SQLite queries. - cursor (sqlite3.Cursor): Cursor object for executing SQLite queries.
""" """
cursor.execute("SELECT * FROM Groups") cursor.execute("SELECT * FROM Groups")
rows = cursor.fetchall() rows = cursor.fetchall()
print("Groups:") print("Groups:")
@ -212,8 +205,7 @@ def print_groups_table(cursor):
# Function to print the content of the Doors table # Function to print the content of the Doors table
def print_doors_table(cursor): def print_doors_table(cursor):
""" """Print the content of the Doors table.
Print the content of the Doors table.
## Parameters: ## Parameters:
- cursor (sqlite3.Cursor): Cursor object for executing SQLite queries. - cursor (sqlite3.Cursor): Cursor object for executing SQLite queries.
@ -227,8 +219,7 @@ def print_doors_table(cursor):
# Function to print the content of the Log table # Function to print the content of the Log table
def print_log_table(cursor): def print_log_table(cursor):
""" """Print the content of the Log table.
Print the content of the Log table.
## Parameters: ## Parameters:
- cursor (sqlite3.Cursor): Cursor object for executing SQLite queries. - cursor (sqlite3.Cursor): Cursor object for executing SQLite queries.
@ -242,8 +233,7 @@ def print_log_table(cursor):
# Function to print the content of the entire database # Function to print the content of the entire database
def print_database_content(db_file): def print_database_content(db_file):
""" """Print the content of the entire database.
Print the content of the entire database.
## Parameters: ## Parameters:
- db_file (str): The file path to the SQLite database. - db_file (str): The file path to the SQLite database.
@ -260,9 +250,8 @@ def print_database_content(db_file):
def get_logs(): def get_logs():
""" """Fetch all logs from the log table in the database.
Fetch all logs from the log table in the database.
## Returns: ## Returns:
- list: List of log records. - list: List of log records.
""" """
@ -282,8 +271,7 @@ def get_logs():
def get_latest_logs(db_file, limit=10): def get_latest_logs(db_file, limit=10):
""" """Fetch the latest logs from the database.
Fetch the latest logs from the database.
## Parameters: ## Parameters:
- db_file (str): The file path to the SQLite database. - db_file (str): The file path to the SQLite database.
@ -312,10 +300,9 @@ def get_latest_logs(db_file, limit=10):
# Function to fetch list of existing groups from the database # Function to fetch list of existing groups from the database
def get_existing_groups(db_file): def get_existing_groups(db_file):
""" """Fetches a list of existing groups from the database.
Fetches a list of existing groups from the database.
## Parameters: ## Parameters:
- db_file (str): The file path to the SQLite database. - db_file (str): The file path to the SQLite database.
## Returns: ## Returns:
@ -334,8 +321,7 @@ def get_existing_groups(db_file):
def delete_group_from_database(group_cn): def delete_group_from_database(group_cn):
""" """Delete a group from the database.
Delete a group from the database.
This function deletes a group with the specified common name (cn) from both the Groups and Doors tables This function deletes a group with the specified common name (cn) from both the Groups and Doors tables
in the database. in the database.
@ -352,8 +338,7 @@ def delete_group_from_database(group_cn):
def get_doors(): def get_doors():
""" """Retrieve all doors from the database.
Retrieve all doors from the database.
This function fetches all rows from the Doors table in the database and returns them as a list of tuples. This function fetches all rows from the Doors table in the database and returns them as a list of tuples.
@ -369,9 +354,8 @@ def get_doors():
def get_users(): def get_users():
""" """Fetch all users from the Users table in the database.
Fetch all users from the Users table in the database.
## Returns: ## Returns:
- list: List of user records. - list: List of user records.
""" """
@ -387,8 +371,7 @@ def get_users():
# Function to add a door to the database # Function to add a door to the database
def add_door_to_database(db_file, group_cn, Door_id): def add_door_to_database(db_file, group_cn, Door_id):
""" """Add a door to the database.
Add a door to the database.
This function inserts a new door record into the Doors table with the specified group common name (cn) This function inserts a new door record into the Doors table with the specified group common name (cn)
and door ID. and door ID.
@ -425,8 +408,7 @@ def add_door_to_database(db_file, group_cn, Door_id):
# Function to verify if the user is allowed to open the door # Function to verify if the user is allowed to open the door
def check_access(rfid_uid_str, door_id): def check_access(rfid_uid_str, door_id):
""" """Check if the user is allowed to open the door.
Check if the user is allowed to open the door.
This function verifies if the user associated with the given RFID UID is allowed to open the door This function verifies if the user associated with the given RFID UID is allowed to open the door
specified by the door ID. specified by the door ID.
@ -450,7 +432,8 @@ def check_access(rfid_uid_str, door_id):
# Get the user's UPN and group memberships based on the RFID UID # Get the user's UPN and group memberships based on the RFID UID
cursor.execute( cursor.execute(
"SELECT upn, MemberOf FROM Users WHERE rFIDUID = ?", (rfid_uid_bytes,) "SELECT upn, MemberOf FROM Users WHERE rFIDUID = ?",
(rfid_uid_bytes,),
) )
user_data = cursor.fetchone() user_data = cursor.fetchone()
if user_data is None: if user_data is None:
@ -472,8 +455,7 @@ def check_access(rfid_uid_str, door_id):
# Check if the user's group is allowed to open the door # Check if the user's group is allowed to open the door
if door_group in user_groups.split(","): if door_group in user_groups.split(","):
return True, upn # Access granted return True, upn # Access granted
else: return False, None # Access denied
return False, None # Access denied
except sqlite3.Error as e: except sqlite3.Error as e:
print(f"SQLite Error: {e}") print(f"SQLite Error: {e}")

View File

@ -1,15 +1,15 @@
from datetime import datetime
import ldap
import sqlite3 import sqlite3
import threading import threading
from datetime import datetime
import ldap
import schedule import schedule
from env import DOOR_ACCESS_GROUPS_DN, LDAPPASS, LDAPUSER, LDAP_SERVER, USERS_DN from env import DOOR_ACCESS_GROUPS_DN, LDAP_SERVER, LDAPPASS, LDAPUSER, USERS_DN
# Function to initialize LDAP connection # Function to initialize LDAP connection
def initialize_ldap_connection(): def initialize_ldap_connection():
""" """Initialize the LDAP connection.
Initialize the LDAP connection.
This function attempts to establish a connection to the LDAP server using the provided server address, This function attempts to establish a connection to the LDAP server using the provided server address,
user credentials, and settings. If the connection is successful, it returns the connection object. user credentials, and settings. If the connection is successful, it returns the connection object.
@ -31,8 +31,7 @@ def initialize_ldap_connection():
# Function to retrieve users from LDAP # Function to retrieve users from LDAP
def retrieve_users_from_ldap(ldap_connection): def retrieve_users_from_ldap(ldap_connection):
""" """Retrieve users from LDAP.
Retrieve users from LDAP.
This function searches the LDAP directory for users within the specified base DN and returns the search result. This function searches the LDAP directory for users within the specified base DN and returns the search result.
It searches for objects with the 'user' object class within the subtree of the specified base DN. It searches for objects with the 'user' object class within the subtree of the specified base DN.
@ -47,7 +46,9 @@ def retrieve_users_from_ldap(ldap_connection):
""" """
try: try:
result = ldap_connection.search_s( result = ldap_connection.search_s(
USERS_DN, ldap.SCOPE_SUBTREE, "(objectClass=user)" USERS_DN,
ldap.SCOPE_SUBTREE,
"(objectClass=user)",
) )
return result return result
except ldap.LDAPError as e: except ldap.LDAPError as e:
@ -57,8 +58,7 @@ def retrieve_users_from_ldap(ldap_connection):
# Function to retrieve groups from LDAP # Function to retrieve groups from LDAP
def retrieve_groups_from_ldap(ldap_connection): def retrieve_groups_from_ldap(ldap_connection):
""" """Retrieve groups from LDAP.
Retrieve groups from LDAP.
This function searches the LDAP directory for groups within the specified base DN and returns the search result. This function searches the LDAP directory for groups within the specified base DN and returns the search result.
It searches for objects with the 'group' object class within the subtree of the specified base DN. It searches for objects with the 'group' object class within the subtree of the specified base DN.
@ -73,7 +73,9 @@ def retrieve_groups_from_ldap(ldap_connection):
""" """
try: try:
result = ldap_connection.search_s( result = ldap_connection.search_s(
DOOR_ACCESS_GROUPS_DN, ldap.SCOPE_SUBTREE, "(objectClass=group)" DOOR_ACCESS_GROUPS_DN,
ldap.SCOPE_SUBTREE,
"(objectClass=group)",
) )
return result return result
except ldap.LDAPError as e: except ldap.LDAPError as e:
@ -83,8 +85,7 @@ def retrieve_groups_from_ldap(ldap_connection):
# Function to add user to the database or update if already exists # Function to add user to the database or update if already exists
def add_user_to_database(conn, cursor, upn, rfid_uid, member_of): def add_user_to_database(conn, cursor, upn, rfid_uid, member_of):
""" """Add a user to the database or update the user's information if they already exist.
Add a user to the database or update the user's information if they already exist.
This function checks if a user with the given UPN (User Principal Name) already exists in the database. This function checks if a user with the given UPN (User Principal Name) already exists in the database.
If the user exists and their RFID UID or group membership has changed, the function updates the user's If the user exists and their RFID UID or group membership has changed, the function updates the user's
@ -117,7 +118,7 @@ def add_user_to_database(conn, cursor, upn, rfid_uid, member_of):
print(f"[{datetime.now()}] User '{upn}' updated in the database.") print(f"[{datetime.now()}] User '{upn}' updated in the database.")
else: else:
print( print(
f"[{datetime.now()}] User '{upn}' already exists in the database with the same data." f"[{datetime.now()}] User '{upn}' already exists in the database with the same data.",
) )
else: else:
# User doesn't exist, insert new user # User doesn't exist, insert new user
@ -133,23 +134,26 @@ def add_user_to_database(conn, cursor, upn, rfid_uid, member_of):
# Function to add group to the database or update if already exists # Function to add group to the database or update if already exists
def add_group_to_database(conn, cursor, cn): def add_group_to_database(conn, cursor, cn):
""" """Add a group to the database if it does not already exist.
Add a group to the database if it does not already exist.
This function checks if a group with the given CN (Common Name) already exists in the database. This function checks if a group with the given CN (Common Name) already exists in the database.
If the group exists, it prints a message indicating that the group already exists. If the group If the group exists, it prints a message indicating that the group already exists. If the group
does not exist, it inserts a new record for the group. does not exist, it inserts a new record for the group.
Parameters: Parameters
----------
conn (sqlite3.Connection): The SQLite database connection. conn (sqlite3.Connection): The SQLite database connection.
cursor (sqlite3.Cursor): The cursor object for executing SQL queries. cursor (sqlite3.Cursor): The cursor object for executing SQL queries.
cn (str): The Common Name of the group. cn (str): The Common Name of the group.
Returns: Returns
-------
None None
Raises: Raises
------
sqlite3.Error: If an error occurs while accessing the SQLite database. sqlite3.Error: If an error occurs while accessing the SQLite database.
""" """
try: try:
cursor.execute("SELECT * FROM Groups WHERE cn=?", (cn,)) cursor.execute("SELECT * FROM Groups WHERE cn=?", (cn,))
@ -168,13 +172,14 @@ def add_group_to_database(conn, cursor, cn):
# Function to sync LDAP users and groups to the database # Function to sync LDAP users and groups to the database
def sync_ldap_to_database(db_file): def sync_ldap_to_database(db_file):
""" """Syncs LDAP users and groups to the SQLite database.
Syncs LDAP users and groups to the SQLite database.
Args: Args:
----
db_file (str): The path to the SQLite database file. db_file (str): The path to the SQLite database file.
Returns: Returns:
-------
None None
This function connects to the LDAP server, retrieves user and group information, This function connects to the LDAP server, retrieves user and group information,
@ -183,8 +188,10 @@ def sync_ldap_to_database(db_file):
and groups are added or updated in the database according to the LDAP information. and groups are added or updated in the database according to the LDAP information.
Note: Note:
----
The LDAP connection must be properly configured and the LDAP server accessible The LDAP connection must be properly configured and the LDAP server accessible
from the machine running this script. from the machine running this script.
""" """
ldap_conn = initialize_ldap_connection() ldap_conn = initialize_ldap_connection()
if ldap_conn: if ldap_conn:
@ -213,11 +220,11 @@ def sync_ldap_to_database(db_file):
cursor.execute("DELETE FROM Users WHERE upn=?", (upn,)) cursor.execute("DELETE FROM Users WHERE upn=?", (upn,))
conn.commit() conn.commit()
print( print(
f"[{datetime.now()}] User '{upn}' disabled in LDAP and removed from the database." f"[{datetime.now()}] User '{upn}' disabled in LDAP and removed from the database.",
) )
else: else:
print( print(
f"[{datetime.now()}] User '{upn}' disabled in LDAP but not present in the database." f"[{datetime.now()}] User '{upn}' disabled in LDAP but not present in the database.",
) )
continue # Skip adding the disabled user to the database continue # Skip adding the disabled user to the database
@ -236,35 +243,39 @@ def sync_ldap_to_database(db_file):
def run_sync_ldap_to_database_thread(db_file): def run_sync_ldap_to_database_thread(db_file):
""" """Run the LDAP synchronization process in a separate thread.
Run the LDAP synchronization process in a separate thread.
This function initiates the synchronization of LDAP data to the database in a background thread. This function initiates the synchronization of LDAP data to the database in a background thread.
It ensures that the LDAP synchronization runs asynchronously, allowing the main program to continue It ensures that the LDAP synchronization runs asynchronously, allowing the main program to continue
running without being blocked. running without being blocked.
Parameters: Parameters
----------
db_file (str): The path to the SQLite database file. db_file (str): The path to the SQLite database file.
Returns: Returns
-------
None None
""" """
print(f"[{datetime.now()}] Running LDAP sync") print(f"[{datetime.now()}] Running LDAP sync")
threading.Thread(target=sync_ldap_to_database, args=(db_file,), daemon=True).start() threading.Thread(target=sync_ldap_to_database, args=(db_file,), daemon=True).start()
def schedule_sync_ldap_to_database(db_file): def schedule_sync_ldap_to_database(db_file):
""" """Schedule the LDAP synchronization process to run immediately and then every 5 minutes.
Schedule the LDAP synchronization process to run immediately and then every 5 minutes.
This function runs the LDAP synchronization process in a background thread immediately upon invocation This function runs the LDAP synchronization process in a background thread immediately upon invocation
and sets up a recurring schedule to run the synchronization every 5 minutes. and sets up a recurring schedule to run the synchronization every 5 minutes.
Parameters: Parameters
----------
db_file (str): The path to the SQLite database file. db_file (str): The path to the SQLite database file.
Returns: Returns
-------
None None
""" """
run_sync_ldap_to_database_thread(db_file) # Run immediately run_sync_ldap_to_database_thread(db_file) # Run immediately
schedule.every(5).minutes.do(run_sync_ldap_to_database_thread, db_file) schedule.every(5).minutes.do(run_sync_ldap_to_database_thread, db_file)

View File

@ -1,9 +1,8 @@
from ldapSync import schedule_sync_ldap_to_database
from database import setup_database
from Webserver import run_webServer_thread
from env import DBFILE
import schedule import schedule
from database import setup_database
from env import DBFILE
from ldapSync import schedule_sync_ldap_to_database
from Webserver import run_webServer_thread
setup_database(DBFILE) setup_database(DBFILE)
run_webServer_thread() run_webServer_thread()