This commit is contained in:
jeanGaston 2024-06-09 17:17:31 +02:00
commit d6a3c1bcd6
6 changed files with 420 additions and 83 deletions

View File

@ -35,6 +35,15 @@ inactivity_timer = Timer(-1)
def init_oled(): def init_oled():
"""
Initialize the OLED display.
This function initializes the OLED display with a width of 128 pixels, height of 64 pixels,
and communicates over the specified I2C interface.
## Raises:
- Exception: If there's an error initializing the OLED display.
"""
global oled global oled
try: try:
oled = SSD1306_I2C(128, 64, i2c) oled = SSD1306_I2C(128, 64, i2c)
@ -47,6 +56,18 @@ def init_oled():
def display_message(message, ip_address): def display_message(message, ip_address):
"""
Display a message on the OLED screen.
This function displays a message on the OLED screen along with the IP address.
## Parameters:
- message (str): The message to be displayed.
- ip_address (str): The IP address to be displayed.
## Raises:
- Exception: If there's an error displaying the message on the OLED screen.
"""
global last_activity_time, screensaver_active, screensaver_thread_running global last_activity_time, screensaver_active, screensaver_thread_running
last_activity_time = time.time() last_activity_time = time.time()
screensaver_active = False screensaver_active = False
@ -70,6 +91,17 @@ def display_message(message, ip_address):
def screensaver(): def screensaver():
"""
Activate the screensaver with RF-AD animation.
This function activates the screensaver by displaying an RF-AD animation moving across the screen.
## Global Variables:
- screensaver_active (bool): Flag indicating if the screensaver is active.
- screensaver_thread_running (bool): Flag indicating if the screensaver thread is running.
- last_activity_time (float): Timestamp of the last activity.
"""
global screensaver_active, screensaver_thread_running global screensaver_active, screensaver_thread_running
x, y = 0, 0 x, y = 0, 0
direction_x, direction_y = 1, 1 direction_x, direction_y = 1, 1
@ -95,6 +127,16 @@ def screensaver():
def start_screensaver_thread(): def start_screensaver_thread():
"""
Start the screensaver thread if it's not already running.
This function starts the screensaver thread if it's not already running. It sets flags to indicate
the screensaver is active and the thread is running.
## Global Variables:
- screensaver_active (bool): Flag indicating if the screensaver is active.
- screensaver_thread_running (bool): Flag indicating if the screensaver thread is running.
"""
global screensaver_active, screensaver_thread_running global screensaver_active, screensaver_thread_running
if not screensaver_thread_running: if not screensaver_thread_running:
screensaver_active = True screensaver_active = True
@ -102,17 +144,43 @@ def start_screensaver_thread():
_thread.start_new_thread(screensaver, ()) _thread.start_new_thread(screensaver, ())
def handle_inactivity(timer): def handle_inactivity():
"""
Handle user inactivity by starting the screensaver if necessary.
This function is called by a timer to check for user inactivity. If the specified time period
has passed since the last activity, it starts the screensaver thread.
"""
if time.time() - last_activity_time > 60: if time.time() - last_activity_time > 60:
start_screensaver_thread() start_screensaver_thread()
def reset_inactivity_timer(): def reset_inactivity_timer():
"""
Reset the inactivity timer.
This function resets the last activity time to the current time, effectively restarting the
inactivity timer.
"""
global last_activity_time global last_activity_time
last_activity_time = time.time() last_activity_time = time.time()
def test_server_connection(ip_address): def test_server_connection(ip_address):
"""
Test the connection to the server and handle connection errors.
This function tests the connection to the server by sending an HTTP GET request to the server
endpoint. It handles connection errors and displays appropriate messages on the OLED screen.
## Parameters:
- ip_address (str): The IP address of the server.
## Global Variables:
- SERVER_IP (str): The IP address of the server.
- SERVER_PORT (int): The port number of the server.
"""
while True: while True:
try: try:
response = requests.get(f"http://{SERVER_IP}:{SERVER_PORT}/") response = requests.get(f"http://{SERVER_IP}:{SERVER_PORT}/")
@ -145,6 +213,21 @@ def test_server_connection(ip_address):
# Connect to WiFi # Connect to WiFi
def connect_wifi(ssid, password): def connect_wifi(ssid, password):
"""
Connect to a WiFi network.
This function connects the device to the specified WiFi network using the provided SSID and password.
It waits until the connection is established and then displays a message on the OLED screen indicating
successful connection.
## Parameters:
- ssid (str): The SSID of the WiFi network.
- password (str): The password of the WiFi network.
## Global Variables:
- SERVER_IP (str): The IP address of the server.
- SERVER_PORT (int): The port number of the server.
"""
wlan = network.WLAN(network.STA_IF) wlan = network.WLAN(network.STA_IF)
wlan.active(True) wlan.active(True)
wlan.connect(ssid, password) wlan.connect(ssid, password)
@ -161,6 +244,18 @@ def connect_wifi(ssid, password):
# Function to send RFID UID to the server # Function to send RFID UID to the server
def send_rfid_to_server(rfid_uid): def send_rfid_to_server(rfid_uid):
"""
Send RFID UID to the server for access verification.
This function constructs a JSON payload containing the RFID UID and the door ID, and sends it to the server
for access verification. It expects a JSON response from the server indicating whether access is granted.
## Parameters:
- rfid_uid (str): The RFID UID to be sent to the server.
## Returns:
- dict: A dictionary containing the response from the server, indicating whether access is granted.
"""
try: try:
url = f"http://{SERVER_IP}:{SERVER_PORT}/access" url = f"http://{SERVER_IP}:{SERVER_PORT}/access"
headers = {"Content-Type": "application/json"} headers = {"Content-Type": "application/json"}
@ -175,6 +270,15 @@ def send_rfid_to_server(rfid_uid):
# Main loop to scan RFID tags # Main loop to scan RFID tags
def main(): def main():
"""
Main loop to scan RFID tags and handle access control.
This function initializes the OLED display, connects to WiFi, and starts a loop to scan RFID tags.
It handles user authentication by sending the RFID UID to the server and displaying access status on the OLED.
The function also sets up an inactivity timer to activate a screensaver after 1 minute of inactivity.
"""
# Retry mechanism for OLED initialization # Retry mechanism for OLED initialization
for _ in range(3): for _ in range(3):
try: try:

View File

@ -8,6 +8,7 @@
- [3. Clone the Repository](./server.md/#3-clone-the-repository) - [3. Clone the Repository](./server.md/#3-clone-the-repository)
- [4. Create the .env File](./server.md/#4-create-the-env-file) - [4. Create the .env File](./server.md/#4-create-the-env-file)
- [5. Build and Run the Docker Container](./server.md/#5-build-and-run-the-docker-container) - [5. Build and Run the Docker Container](./server.md/#5-build-and-run-the-docker-container)
# The Active Directory part # The Active Directory part
## 1. Modify the LDAP Schema ## 1. Modify the LDAP Schema
@ -77,9 +78,9 @@ Then navigate into the server folder
```bash ```bash
cd ./RD-AD/Server cd ./RD-AD/Server
``` ```
## 4. Create the .env File ## 4. Create the `.env` File
Create a .env file in the [server directory](../Server/) with the following content: Create a `.env` file in the [server directory](../Server/) with the following content:
``` ```
LDAPUSER=[The user you have created earlier] LDAPUSER=[The user you have created earlier]

View File

@ -1,15 +1,6 @@
from threading import Thread
from flask import (
Flask,
render_template,
send_file,
Response,
request,
redirect,
jsonify,
)
import io import io
from ldapSync import sync_ldap_to_database from threading import Thread
from database import ( from database import (
add_door_to_database, add_door_to_database,
check_access, check_access,
@ -22,6 +13,15 @@ from database import (
log_access_attempt, log_access_attempt,
) )
from env import DBFILE, WebServerPORT from env import DBFILE, WebServerPORT
from flask import (
Flask,
Response,
jsonify,
redirect,
render_template,
request,
)
from ldapSync import sync_ldap_to_database
app = Flask(__name__) app = Flask(__name__)
@ -92,11 +92,9 @@ def add_door():
group_cn = request.form["group_cn"] group_cn = request.form["group_cn"]
# Update with your database file path # Update with your database file path
exec = add_door_to_database(DBFILE, group_cn, Door_id)
if add_door_to_database(DBFILE, group_cn, Door_id): if add_door_to_database(DBFILE, group_cn, Door_id):
return redirect("/") return redirect("/")
else: return "Failed to add door to the database."
return f"Failed to add door to the database."
# Route to handle sync button click # Route to handle sync button click
@ -118,20 +116,30 @@ def door_access():
access_granted, upn = check_access(rfid_uid, door_id) access_granted, upn = check_access(rfid_uid, door_id)
if access_granted: if access_granted:
print("")
log_access_attempt(DBFILE, upn, rfid_uid, True, door_id) log_access_attempt(DBFILE, upn, rfid_uid, True, door_id)
return jsonify({"access_granted": True, "upn": upn}), 200 return jsonify({"access_granted": True, "upn": upn}), 200
else:
log_access_attempt(DBFILE, upn, rfid_uid, False, door_id) log_access_attempt(DBFILE, upn, rfid_uid, False, door_id)
return jsonify({"access_granted": False}), 403 return jsonify({"access_granted": False}), 403
def run_flask_app(): def run_flask_app():
"""Run the Flask web application.
This function starts the Flask web application with debugging enabled,
no reloader, on the specified port and host. It serves as the main entry
point for running the web server.
"""
app.run(debug=True, use_reloader=False, port=WebServerPORT, host="0.0.0.0") app.run(debug=True, use_reloader=False, port=WebServerPORT, host="0.0.0.0")
def run_webServer_thread(): def run_webServer_thread():
"""Start the Flask web server in a separate thread.
This function initializes and starts a new thread to run the Flask web
application. It allows the web server to run concurrently with other
tasks in the main program, ensuring the web interface remains responsive.
"""
print(f"STARTING WEB SERVER ON PORT {WebServerPORT}") print(f"STARTING WEB SERVER ON PORT {WebServerPORT}")
flask_thread = Thread(target=run_flask_app, daemon=True) flask_thread = Thread(target=run_flask_app, daemon=True)
flask_thread.start() flask_thread.start()

View File

@ -1,18 +1,38 @@
from datetime import datetime
import sqlite3 import sqlite3
from datetime import datetime
from env import DBFILE from env import DBFILE
# Function to check if a table exists in the database # Function to check if a table exists in the database
def table_exists(cursor, table_name): def table_exists(cursor, table_name):
"""Check if a table exists in the database.
This function checks whether a table with the specified name exists in the database.
## Parameters:
- cursor (sqlite3.Cursor): The cursor object to execute SQL queries.
- table_name (str): The name of the table to check.
## Returns:
- bool: True if the table exists, False otherwise.
"""
cursor.execute( cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name=?", (table_name,) "SELECT name FROM sqlite_master WHERE type='table' AND name=?",
(table_name,),
) )
return cursor.fetchone() is not None return cursor.fetchone() is not None
# Function to create the Users table # Function to create the Users table
def create_users_table(cursor): def create_users_table(cursor):
"""Create the Users table in the database.
This function creates the Users table with columns for user principal name (upn), RFID UID, and member of groups.
## Parameters:
- cursor (sqlite3.Cursor): The cursor object to execute SQL queries.
"""
cursor.execute("""CREATE TABLE Users ( cursor.execute("""CREATE TABLE Users (
upn TEXT PRIMARY KEY, upn TEXT PRIMARY KEY,
rFIDUID TEXT, rFIDUID TEXT,
@ -23,6 +43,13 @@ def create_users_table(cursor):
# Function to create the Groups table # Function to create the Groups table
def create_groups_table(cursor): def create_groups_table(cursor):
"""Create the Groups table in the database.
This function creates the Groups table with a single column for common name (cn) of the group.
## Parameters:
- cursor (sqlite3.Cursor): The cursor object to execute SQL queries.
"""
cursor.execute("""CREATE TABLE Groups ( cursor.execute("""CREATE TABLE Groups (
cn TEXT PRIMARY KEY cn TEXT PRIMARY KEY
)""") )""")
@ -30,6 +57,13 @@ def create_groups_table(cursor):
# Function to create the Doors table # Function to create the Doors table
def create_doors_table(cursor): def create_doors_table(cursor):
"""Create the Doors table in the database.
This function creates the Doors table with columns for door ID and associated group common name.
## Parameters:
- cursor (sqlite3.Cursor): The cursor object to execute SQL queries.
"""
cursor.execute("""CREATE TABLE Doors ( cursor.execute("""CREATE TABLE Doors (
id INTEGER PRIMARY KEY, id INTEGER PRIMARY KEY,
GroupCn TEXT, GroupCn TEXT,
@ -39,10 +73,13 @@ def create_doors_table(cursor):
# Function to create the logs table # Function to create the logs table
def create_logs_table(cursor): def create_logs_table(cursor):
""" """Create the logs table in the database.
Create a log table with columns id, timestamp, user, and granted.
:param db_file: The database file path. This function creates the logs table with columns for ID (auto-incremented), timestamp, user, RFID UID, door ID,
and access granted status. Foreign key constraints are set on the door ID, user, and RFID UID columns.
## Parameters:
- cursor (sqlite3.Cursor): The cursor object to execute SQL queries.
""" """
cursor.execute(""" cursor.execute("""
CREATE TABLE IF NOT EXISTS log ( CREATE TABLE IF NOT EXISTS log (
@ -61,6 +98,18 @@ def create_logs_table(cursor):
# Function to setup the database # Function to setup the database
def setup_database(db_file): def setup_database(db_file):
"""Set up the SQLite database by creating necessary tables if they don't already exist.
This function checks if the Users, Groups, Doors, and Log tables exist in the database. If any of them don't exist,
it creates them using their respective creation functions. After creating or verifying the tables, it commits
the changes and closes the database connection.
## Parameters:
- db_file (str): The file path to the SQLite database.
## Returns:
- None
"""
# Connect to the SQLite database # Connect to the SQLite database
conn = sqlite3.connect(db_file) conn = sqlite3.connect(db_file)
cursor = conn.cursor() cursor = conn.cursor()
@ -97,14 +146,20 @@ def setup_database(db_file):
def log_access_attempt(db_file, user, rFIDUID, granted, doorID): def log_access_attempt(db_file, user, rFIDUID, granted, doorID):
""" """Log an access attempt to the database.
Log an access attempt to the log table.
:param db_file: The database file path. This function inserts a new entry into the log table of the SQLite database, recording details about the access attempt,
:param user: The user attempting access. such as the timestamp, user, RFID UID, whether access was granted, and the door ID.
:param rFIDUID: The user's tag uid
:param granted: Whether access was granted (True/False). ## Parameters:
:param doorID: The door id - db_file (str): The file path to the SQLite database.
- user (str): The user's UPN (User Principal Name).
- rFIDUID (str): The RFID UID associated with the access attempt.
- granted (bool): A boolean indicating whether access was granted (True) or denied (False).
- doorID (int): The ID of the door where the access attempt occurred.
# Returns:
- None
""" """
conn = sqlite3.connect(db_file) conn = sqlite3.connect(db_file)
cursor = conn.cursor() cursor = conn.cursor()
@ -122,6 +177,11 @@ def log_access_attempt(db_file, user, rFIDUID, granted, doorID):
def print_users_table(cursor): def print_users_table(cursor):
"""Print the content of the Users table.
## Parameters:
- cursor (sqlite3.Cursor): Cursor object for executing SQLite queries.
"""
cursor.execute("SELECT * FROM Users") cursor.execute("SELECT * FROM Users")
rows = cursor.fetchall() rows = cursor.fetchall()
print("Users:") print("Users:")
@ -131,6 +191,11 @@ def print_users_table(cursor):
# Function to print the content of the Groups table # Function to print the content of the Groups table
def print_groups_table(cursor): def print_groups_table(cursor):
"""Print the content of the Groups table.
## Parameters:
- cursor (sqlite3.Cursor): Cursor object for executing SQLite queries.
"""
cursor.execute("SELECT * FROM Groups") cursor.execute("SELECT * FROM Groups")
rows = cursor.fetchall() rows = cursor.fetchall()
print("Groups:") print("Groups:")
@ -140,6 +205,11 @@ def print_groups_table(cursor):
# Function to print the content of the Doors table # Function to print the content of the Doors table
def print_doors_table(cursor): def print_doors_table(cursor):
"""Print the content of the Doors table.
## Parameters:
- cursor (sqlite3.Cursor): Cursor object for executing SQLite queries.
"""
cursor.execute("SELECT * FROM Doors") cursor.execute("SELECT * FROM Doors")
rows = cursor.fetchall() rows = cursor.fetchall()
print("Doors:") print("Doors:")
@ -149,6 +219,11 @@ def print_doors_table(cursor):
# Function to print the content of the Log table # Function to print the content of the Log table
def print_log_table(cursor): def print_log_table(cursor):
"""Print the content of the Log table.
## Parameters:
- cursor (sqlite3.Cursor): Cursor object for executing SQLite queries.
"""
cursor.execute("SELECT * FROM log") cursor.execute("SELECT * FROM log")
rows = cursor.fetchall() rows = cursor.fetchall()
print("Logs:") print("Logs:")
@ -158,6 +233,11 @@ def print_log_table(cursor):
# Function to print the content of the entire database # Function to print the content of the entire database
def print_database_content(db_file): def print_database_content(db_file):
"""Print the content of the entire database.
## Parameters:
- db_file (str): The file path to the SQLite database.
"""
conn = sqlite3.connect(db_file) conn = sqlite3.connect(db_file)
cursor = conn.cursor() cursor = conn.cursor()
@ -170,9 +250,10 @@ def print_database_content(db_file):
def get_logs(): def get_logs():
""" """Fetch all logs from the log table in the database.
Fetch all logs from the log table in the database.
:return: List of log records. ## Returns:
- list: List of log records.
""" """
conn = sqlite3.connect(DBFILE) conn = sqlite3.connect(DBFILE)
cursor = conn.cursor() cursor = conn.cursor()
@ -190,11 +271,14 @@ def get_logs():
def get_latest_logs(db_file, limit=10): def get_latest_logs(db_file, limit=10):
""" """Fetch the latest logs from the database.
Fetch the latest logs from the database.
:param limit: The number of latest logs to fetch. ## Parameters:
:return: List of log entries. - db_file (str): The file path to the SQLite database.
- limit (int): The number of latest logs to fetch. Default is 10.
## Returns:
- list: List of log entries.
""" """
conn = sqlite3.connect(db_file) conn = sqlite3.connect(db_file)
cursor = conn.cursor() cursor = conn.cursor()
@ -216,6 +300,14 @@ def get_latest_logs(db_file, limit=10):
# Function to fetch list of existing groups from the database # Function to fetch list of existing groups from the database
def get_existing_groups(db_file): def get_existing_groups(db_file):
"""Fetches a list of existing groups from the database.
## Parameters:
- db_file (str): The file path to the SQLite database.
## Returns:
- list: List of existing group names.
"""
try: try:
conn = sqlite3.connect(db_file) conn = sqlite3.connect(db_file)
cursor = conn.cursor() cursor = conn.cursor()
@ -229,6 +321,14 @@ def get_existing_groups(db_file):
def delete_group_from_database(group_cn): def delete_group_from_database(group_cn):
"""Delete a group from the database.
This function deletes a group with the specified common name (cn) from both the Groups and Doors tables
in the database.
## Parameters:
- group_cn (str): The common name of the group to delete.
"""
conn = sqlite3.connect(DBFILE) conn = sqlite3.connect(DBFILE)
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("DELETE FROM Groups WHERE cn = ?", (group_cn,)) cursor.execute("DELETE FROM Groups WHERE cn = ?", (group_cn,))
@ -238,6 +338,13 @@ def delete_group_from_database(group_cn):
def get_doors(): def get_doors():
"""Retrieve all doors from the database.
This function fetches all rows from the Doors table in the database and returns them as a list of tuples.
## Returns:
- list: A list of tuples representing door records.
"""
conn = sqlite3.connect(DBFILE) conn = sqlite3.connect(DBFILE)
cursor = conn.cursor() cursor = conn.cursor()
cursor.execute("SELECT * FROM Doors") cursor.execute("SELECT * FROM Doors")
@ -247,9 +354,10 @@ def get_doors():
def get_users(): def get_users():
""" """Fetch all users from the Users table in the database.
Fetch all users from the Users table in the database.
:return: List of user records. ## Returns:
- list: List of user records.
""" """
conn = sqlite3.connect(DBFILE) conn = sqlite3.connect(DBFILE)
cursor = conn.cursor() cursor = conn.cursor()
@ -263,6 +371,21 @@ def get_users():
# Function to add a door to the database # Function to add a door to the database
def add_door_to_database(db_file, group_cn, Door_id): def add_door_to_database(db_file, group_cn, Door_id):
"""Add a door to the database.
This function inserts a new door record into the Doors table with the specified group common name (cn)
and door ID.
## Parameters:
- db_file (str): The file path to the SQLite database.
- group_cn (str): The common name of the group associated with the door.
- Door_id (int): The ID of the door.
## Returns:
- bool: True if the door was added successfully, False otherwise.
## Raises:
- sqlite3.Error: If there's an error executing the SQL query.
"""
try: try:
conn = sqlite3.connect(db_file) conn = sqlite3.connect(db_file)
cursor = conn.cursor() cursor = conn.cursor()
@ -285,6 +408,21 @@ def add_door_to_database(db_file, group_cn, Door_id):
# Function to verify if the user is allowed to open the door # Function to verify if the user is allowed to open the door
def check_access(rfid_uid_str, door_id): def check_access(rfid_uid_str, door_id):
"""Check if the user is allowed to open the door.
This function verifies if the user associated with the given RFID UID is allowed to open the door
specified by the door ID.
## Parameters:
- rfid_uid_str (str): The RFID UID of the user.
- door_id (int): The ID of the door.
## Returns:
- tuple: A tuple containing a boolean value indicating access permission and the user's UPN
if access is granted, otherwise (False, None).
## Raises:
- sqlite3.Error: If there's an error executing the SQL query.
"""
try: try:
conn = sqlite3.connect(DBFILE) # Update with your database file path conn = sqlite3.connect(DBFILE) # Update with your database file path
cursor = conn.cursor() cursor = conn.cursor()
@ -294,7 +432,8 @@ def check_access(rfid_uid_str, door_id):
# Get the user's UPN and group memberships based on the RFID UID # Get the user's UPN and group memberships based on the RFID UID
cursor.execute( cursor.execute(
"SELECT upn, MemberOf FROM Users WHERE rFIDUID = ?", (rfid_uid_bytes,) "SELECT upn, MemberOf FROM Users WHERE rFIDUID = ?",
(rfid_uid_bytes,),
) )
user_data = cursor.fetchone() user_data = cursor.fetchone()
if user_data is None: if user_data is None:
@ -316,7 +455,6 @@ def check_access(rfid_uid_str, door_id):
# Check if the user's group is allowed to open the door # Check if the user's group is allowed to open the door
if door_group in user_groups.split(","): if door_group in user_groups.split(","):
return True, upn # Access granted return True, upn # Access granted
else:
return False, None # Access denied return False, None # Access denied
except sqlite3.Error as e: except sqlite3.Error as e:

View File

@ -1,20 +1,22 @@
from datetime import datetime
import ldap
import sqlite3 import sqlite3
import threading import threading
from datetime import datetime
import ldap
import schedule import schedule
from env import DOOR_ACCESS_GROUPS_DN, LDAPPASS, LDAPUSER, LDAP_SERVER, USERS_DN from env import DOOR_ACCESS_GROUPS_DN, LDAP_SERVER, LDAPPASS, LDAPUSER, USERS_DN
# Function to initialize LDAP connection # Function to initialize LDAP connection
def initialize_ldap_connection(): def initialize_ldap_connection():
""" """Initialize the LDAP connection.
## Settings :
None This function attempts to establish a connection to the LDAP server using the provided server address,
## Behavior user credentials, and settings. If the connection is successful, it returns the connection object.
Init the connection to the LDAP server. In case of an error, it prints the error and returns None.
Return LDAPobjet instance when connected
if it fail, return None and print error code ## Returns:
- ldap.LDAPObject or None: The LDAP connection object if successful, otherwise None.
""" """
try: try:
connect = ldap.initialize(LDAP_SERVER) connect = ldap.initialize(LDAP_SERVER)
@ -29,17 +31,24 @@ def initialize_ldap_connection():
# Function to retrieve users from LDAP # Function to retrieve users from LDAP
def retrieve_users_from_ldap(ldap_connection): def retrieve_users_from_ldap(ldap_connection):
""" """Retrieve users from LDAP.
## Settings :
- ldap_connection : LDAPobjet instance This function searches the LDAP directory for users within the specified base DN and returns the search result.
## Behavior It searches for objects with the 'user' object class within the subtree of the specified base DN.
retrieve the users in the specified OU
Return result when it success ## Parameters:
if it fail, return empty list and print error code - ldap_connection (ldap.LDAPObject): The LDAP connection object.
## Returns:
- list of tuple: A list of tuples containing the search result, where each tuple represents a user entry.
Each tuple consists of the DN (Distinguished Name) of the user entry and its attributes.
Returns an empty list if an error occurs during the LDAP search.
""" """
try: try:
result = ldap_connection.search_s( result = ldap_connection.search_s(
USERS_DN, ldap.SCOPE_SUBTREE, "(objectClass=user)" USERS_DN,
ldap.SCOPE_SUBTREE,
"(objectClass=user)",
) )
return result return result
except ldap.LDAPError as e: except ldap.LDAPError as e:
@ -49,17 +58,24 @@ def retrieve_users_from_ldap(ldap_connection):
# Function to retrieve groups from LDAP # Function to retrieve groups from LDAP
def retrieve_groups_from_ldap(ldap_connection): def retrieve_groups_from_ldap(ldap_connection):
""" """Retrieve groups from LDAP.
## Settings :
- ldap_connection : LDAPobjet instance This function searches the LDAP directory for groups within the specified base DN and returns the search result.
## Behavior It searches for objects with the 'group' object class within the subtree of the specified base DN.
retrieve the groups in the specified OU
Return result when it success ## Parameters:
if it fail, return empty list and print error code - ldap_connection (ldap.LDAPObject): The LDAP connection object.
## Returns:
- list of tuple: A list of tuples containing the search result, where each tuple represents a group entry.
Each tuple consists of the DN (Distinguished Name) of the group entry and its attributes.
Returns an empty list if an error occurs during the LDAP search.
""" """
try: try:
result = ldap_connection.search_s( result = ldap_connection.search_s(
DOOR_ACCESS_GROUPS_DN, ldap.SCOPE_SUBTREE, "(objectClass=group)" DOOR_ACCESS_GROUPS_DN,
ldap.SCOPE_SUBTREE,
"(objectClass=group)",
) )
return result return result
except ldap.LDAPError as e: except ldap.LDAPError as e:
@ -69,6 +85,25 @@ def retrieve_groups_from_ldap(ldap_connection):
# Function to add user to the database or update if already exists # Function to add user to the database or update if already exists
def add_user_to_database(conn, cursor, upn, rfid_uid, member_of): def add_user_to_database(conn, cursor, upn, rfid_uid, member_of):
"""Add a user to the database or update the user's information if they already exist.
This function checks if a user with the given UPN (User Principal Name) already exists in the database.
If the user exists and their RFID UID or group membership has changed, the function updates the user's
record. If the user does not exist, the function inserts a new record for the user.
## Parameters:
- conn (sqlite3.Connection): The SQLite database connection.
- cursor (sqlite3.Cursor): The cursor object for executing SQL queries.
- upn (str): The User Principal Name of the user.
- rfid_uid (str): The RFID UID associated with the user.
- member_of (str): The group membership (CN) of the user.
## Returns:
- None
## Raises:
- sqlite3.Error: If an error occurs while accessing the SQLite database.
"""
try: try:
cursor.execute("SELECT * FROM Users WHERE upn=?", (upn,)) cursor.execute("SELECT * FROM Users WHERE upn=?", (upn,))
existing_user = cursor.fetchone() existing_user = cursor.fetchone()
@ -83,7 +118,7 @@ def add_user_to_database(conn, cursor, upn, rfid_uid, member_of):
print(f"[{datetime.now()}] User '{upn}' updated in the database.") print(f"[{datetime.now()}] User '{upn}' updated in the database.")
else: else:
print( print(
f"[{datetime.now()}] User '{upn}' already exists in the database with the same data." f"[{datetime.now()}] User '{upn}' already exists in the database with the same data.",
) )
else: else:
# User doesn't exist, insert new user # User doesn't exist, insert new user
@ -99,6 +134,27 @@ def add_user_to_database(conn, cursor, upn, rfid_uid, member_of):
# Function to add group to the database or update if already exists # Function to add group to the database or update if already exists
def add_group_to_database(conn, cursor, cn): def add_group_to_database(conn, cursor, cn):
"""Add a group to the database if it does not already exist.
This function checks if a group with the given CN (Common Name) already exists in the database.
If the group exists, it prints a message indicating that the group already exists. If the group
does not exist, it inserts a new record for the group.
Parameters
----------
conn (sqlite3.Connection): The SQLite database connection.
cursor (sqlite3.Cursor): The cursor object for executing SQL queries.
cn (str): The Common Name of the group.
Returns
-------
None
Raises
------
sqlite3.Error: If an error occurs while accessing the SQLite database.
"""
try: try:
cursor.execute("SELECT * FROM Groups WHERE cn=?", (cn,)) cursor.execute("SELECT * FROM Groups WHERE cn=?", (cn,))
existing_group = cursor.fetchone() existing_group = cursor.fetchone()
@ -116,13 +172,14 @@ def add_group_to_database(conn, cursor, cn):
# Function to sync LDAP users and groups to the database # Function to sync LDAP users and groups to the database
def sync_ldap_to_database(db_file): def sync_ldap_to_database(db_file):
""" """Syncs LDAP users and groups to the SQLite database.
Syncs LDAP users and groups to the SQLite database.
Args: Args:
----
db_file (str): The path to the SQLite database file. db_file (str): The path to the SQLite database file.
Returns: Returns:
-------
None None
This function connects to the LDAP server, retrieves user and group information, This function connects to the LDAP server, retrieves user and group information,
@ -131,8 +188,10 @@ def sync_ldap_to_database(db_file):
and groups are added or updated in the database according to the LDAP information. and groups are added or updated in the database according to the LDAP information.
Note: Note:
----
The LDAP connection must be properly configured and the LDAP server accessible The LDAP connection must be properly configured and the LDAP server accessible
from the machine running this script. from the machine running this script.
""" """
ldap_conn = initialize_ldap_connection() ldap_conn = initialize_ldap_connection()
if ldap_conn: if ldap_conn:
@ -161,11 +220,11 @@ def sync_ldap_to_database(db_file):
cursor.execute("DELETE FROM Users WHERE upn=?", (upn,)) cursor.execute("DELETE FROM Users WHERE upn=?", (upn,))
conn.commit() conn.commit()
print( print(
f"[{datetime.now()}] User '{upn}' disabled in LDAP and removed from the database." f"[{datetime.now()}] User '{upn}' disabled in LDAP and removed from the database.",
) )
else: else:
print( print(
f"[{datetime.now()}] User '{upn}' disabled in LDAP but not present in the database." f"[{datetime.now()}] User '{upn}' disabled in LDAP but not present in the database.",
) )
continue # Skip adding the disabled user to the database continue # Skip adding the disabled user to the database
@ -184,10 +243,39 @@ def sync_ldap_to_database(db_file):
def run_sync_ldap_to_database_thread(db_file): def run_sync_ldap_to_database_thread(db_file):
"""Run the LDAP synchronization process in a separate thread.
This function initiates the synchronization of LDAP data to the database in a background thread.
It ensures that the LDAP synchronization runs asynchronously, allowing the main program to continue
running without being blocked.
Parameters
----------
db_file (str): The path to the SQLite database file.
Returns
-------
None
"""
print(f"[{datetime.now()}] Running LDAP sync") print(f"[{datetime.now()}] Running LDAP sync")
threading.Thread(target=sync_ldap_to_database, args=(db_file,), daemon=True).start() threading.Thread(target=sync_ldap_to_database, args=(db_file,), daemon=True).start()
def schedule_sync_ldap_to_database(db_file): def schedule_sync_ldap_to_database(db_file):
"""Schedule the LDAP synchronization process to run immediately and then every 5 minutes.
This function runs the LDAP synchronization process in a background thread immediately upon invocation
and sets up a recurring schedule to run the synchronization every 5 minutes.
Parameters
----------
db_file (str): The path to the SQLite database file.
Returns
-------
None
"""
run_sync_ldap_to_database_thread(db_file) # Run immediately run_sync_ldap_to_database_thread(db_file) # Run immediately
schedule.every(5).minutes.do(run_sync_ldap_to_database_thread, db_file) schedule.every(5).minutes.do(run_sync_ldap_to_database_thread, db_file)

View File

@ -1,12 +1,10 @@
from ldapSync import schedule_sync_ldap_to_database
from database import setup_database, print_database_content
from Webserver import run_webServer_thread
from env import DBFILE
import schedule import schedule
from database import setup_database
from env import DBFILE
from ldapSync import schedule_sync_ldap_to_database
from Webserver import run_webServer_thread
setup_database(DBFILE) setup_database(DBFILE)
# print_database_content(DBFILE)
run_webServer_thread() run_webServer_thread()
schedule_sync_ldap_to_database(DBFILE) schedule_sync_ldap_to_database(DBFILE)