376 lines
15 KiB
Python
376 lines
15 KiB
Python
from flask import Flask, render_template, request, redirect, url_for, flash, session
|
|
from flask_sqlalchemy import SQLAlchemy
|
|
from datetime import date
|
|
import gnupg
|
|
import secrets
|
|
import os
|
|
from werkzeug.utils import secure_filename
|
|
|
|
# defines where the upload folder is and creates it
|
|
UPLOAD_FOLDER = "static/uploads"
|
|
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
|
|
|
|
# configures the app
|
|
app = Flask(__name__) # creates de app
|
|
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://love:love@localhost:3309/lovedb' # database connection
|
|
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # disable track modifications (for better performance)
|
|
app.config['SECRET_KEY'] = 'random' # sets the secret key used to generate random numbers
|
|
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER # sets the upload folder
|
|
|
|
db = SQLAlchemy(app) # its like a shortcut to the database
|
|
gpg = gnupg.GPG() # same as above but for gpg
|
|
|
|
COUNTRIES = [ "Afghanistan","Albania","Algeria","Andorra","Angola","Antigua and Barbuda","Argentina",
|
|
"Armenia","Australia","Austria","Azerbaijan","Bahamas","Bahrain","Bangladesh",
|
|
"Barbados","Belarus","Belgium","Belize","Benin","Bhutan","Bolivia","Bosnia and Herzegovina",
|
|
"Botswana","Brazil","Brunei","Bulgaria","Burkina Faso","Burundi","Cabo Verde","Cambodia",
|
|
"Cameroon","Canada","Central African Republic","Chad","Chile","China","Colombia","Comoros",
|
|
"Congo (Congo-Brazzaville)","Costa Rica","Croatia","Cuba","Cyprus","Czechia (Czech Republic)",
|
|
"Democratic Republic of the Congo","Denmark","Djibouti","Dominica","Dominican Republic","Ecuador",
|
|
"Egypt","El Salvador","Equatorial Guinea","Eritrea","Estonia","Eswatini (fmr. Swaziland)",
|
|
"Ethiopia","Fiji","Finland","France","Gabon","Gambia","Georgia","Germany","Ghana","Greece",
|
|
"Grenada","Guatemala","Guinea","Guinea-Bissau","Guyana","Haiti","Holy See","Honduras","Hungary",
|
|
"Iceland","India","Indonesia","Iran","Iraq","Ireland","Israel","Italy","Jamaica","Japan","Jordan",
|
|
"Kazakhstan","Kenya","Kiribati","Kuwait","Kyrgyzstan","Laos","Latvia","Lebanon","Lesotho",
|
|
"Liberia","Libya","Liechtenstein","Lithuania","Luxembourg","Madagascar","Malawi","Malaysia",
|
|
"Maldives","Mali","Malta","Marshall Islands","Mauritania","Mauritius","Mexico","Micronesia",
|
|
"Moldova","Monaco","Mongolia","Montenegro","Morocco","Mozambique","Myanmar (Burma)","Namibia",
|
|
"Nauru","Nepal","Netherlands","New Zealand","Nicaragua","Niger","Nigeria","North Korea",
|
|
"North Macedonia","Norway","Oman","Pakistan","Palau","Palestine State","Panama","Papua New Guinea",
|
|
"Paraguay","Peru","Philippines","Poland","Portugal","Qatar","Romania","Russia","Rwanda",
|
|
"Saint Kitts and Nevis","Saint Lucia","Saint Vincent and the Grenadines","Samoa","San Marino",
|
|
"Sao Tome and Principe","Saudi Arabia","Senegal","Serbia","Seychelles","Sierra Leone","Singapore",
|
|
"Slovakia","Slovenia","Solomon Islands","Somalia","South Africa","South Korea","South Sudan",
|
|
"Spain","Sri Lanka","Sudan","Suriname","Sweden","Switzerland","Syria","Tajikistan","Tanzania",
|
|
"Thailand","Timor-Leste","Togo","Tonga","Trinidad and Tobago","Tunisia","Turkey","Turkmenistan",
|
|
"Tuvalu","Uganda","Ukraine","United Arab Emirates","United Kingdom","United States","Uruguay",
|
|
"Uzbekistan","Vanuatu","Venezuela","Vietnam","Yemen","Zambia","Zimbabwe" ]
|
|
|
|
|
|
# Database creation
|
|
class User(db.Model):
|
|
id = db.Column(db.Integer, primary_key=True)
|
|
username = db.Column(db.String(128), unique=True, nullable=False)
|
|
pgp = db.Column(db.String(4096), nullable=False)
|
|
firstname = db.Column(db.String(128), nullable=False)
|
|
lastname = db.Column(db.String(128), nullable=False)
|
|
sex = db.Column(db.Enum('male', 'female'), nullable=False)
|
|
date_of_birth = db.Column(db.Date, nullable=False)
|
|
profile_picture = db.Column(db.String(200), nullable=False)
|
|
pictures = db.Column(db.JSON, nullable=True)
|
|
country = db.Column(db.String(128), nullable=False)
|
|
city = db.Column(db.String(128), nullable=True)
|
|
height = db.Column(db.Float, nullable=True)
|
|
weight = db.Column(db.Integer, nullable=True)
|
|
race = db.Column(db.String(20), nullable=True)
|
|
prefered_age_range = db.Column(db.String(20), nullable=True)
|
|
likes = db.Column(db.JSON, nullable=True)
|
|
dislikes = db.Column(db.JSON, nullable=True)
|
|
xmpp = db.Column(db.String(128), unique=True, nullable=False)
|
|
email = db.Column(db.String(128), unique=True, nullable=True)
|
|
phone = db.Column(db.String(20), unique=True, nullable=True)
|
|
is_verified = db.Column(db.Boolean, default=False)
|
|
|
|
# calculates user age
|
|
def calculate_age(dob: date) -> int:
|
|
today = date.today()
|
|
return today.year - dob.year - ((today.month, today.day) < (dob.month, dob.day))
|
|
|
|
# saves files to the upload folder and returns their URL
|
|
def save_files(username: str, profile_file, pictures_files):
|
|
# creates a path for the user inside the upload forlder
|
|
user_folder = os.path.join(app.config['UPLOAD_FOLDER'], username)
|
|
os.makedirs(user_folder, exist_ok=True)
|
|
|
|
# prevents unsafe characters to be used in the filename
|
|
profile_filename = secure_filename(profile_file.filename)
|
|
profile_path = os.path.join(user_folder, profile_filename)
|
|
|
|
# saves the profile picture to the path
|
|
profile_file.save(profile_path)
|
|
profile_url = f"/{profile_path.replace(os.sep, '/')}"
|
|
|
|
# saves all of the other pictures
|
|
pictures_urls = []
|
|
for pic in pictures_files:
|
|
if pic.filename:
|
|
filename = secure_filename(pic.filename)
|
|
path = os.path.join(user_folder, filename)
|
|
pic.save(path)
|
|
pictures_urls.append(f"/{path.replace(os.sep, '/')}")
|
|
|
|
return profile_url, pictures_urls
|
|
|
|
# encrypts the chalange for the user to then decrypt with pgp
|
|
def pgp_encrypt_and_import(pgp_key: str, message: str):
|
|
# imports the user's key
|
|
result = gpg.import_keys(pgp_key)
|
|
# check to see if the key has fingerprints
|
|
if not result.fingerprints:
|
|
return None, None
|
|
fingerprint = result.fingerprints[0]
|
|
# encrypts message to the user's fingerprint
|
|
encrypted = gpg.encrypt(message, recipients=[fingerprint])
|
|
if not encrypted.ok:
|
|
return fingerprint, None
|
|
return fingerprint, str(encrypted)
|
|
|
|
# ROUTES ------------------------------------------------------------------------------------------------------
|
|
|
|
@app.route("/")
|
|
def home():
|
|
return render_template("index.html")
|
|
|
|
|
|
@app.route("/register", methods=["GET", "POST"])
|
|
def register():
|
|
if request.method == "POST":
|
|
# collect data to a dictionary
|
|
data = {key: request.form.get(key) for key in [
|
|
"username","pgp","firstname","lastname","sex","date_of_birth","country","xmpp",
|
|
"email","phone","city","height","weight","race","prefered_age_range"
|
|
]}
|
|
|
|
# required fields
|
|
required_fields = ["username","pgp","firstname","lastname","sex","date_of_birth","country","xmpp"]
|
|
if not all(data[f] for f in required_fields):
|
|
flash("Please fill all required fields.")
|
|
return redirect(url_for("register"))
|
|
|
|
# check if fields are unique
|
|
for field in ["username","xmpp","email","phone"]:
|
|
if data.get(field) and User.query.filter_by(**{field:data[field]}).first():
|
|
flash(f"{field.capitalize()} already exists.")
|
|
return redirect(url_for("register"))
|
|
|
|
# validates date format to iso (YYYY-MM-DD)
|
|
try:
|
|
dob = date.fromisoformat(data["date_of_birth"])
|
|
except ValueError:
|
|
flash("Invalid date format.")
|
|
return redirect(url_for("register"))
|
|
|
|
# blocks underage users
|
|
if calculate_age(dob) < 18:
|
|
flash("You must be at least 18 years old to register.")
|
|
return redirect(url_for("register"))
|
|
|
|
# retrieves the user uploaded pictures
|
|
profile_file = request.files.get("profile_picture")
|
|
pictures_files = request.files.getlist("pictures")
|
|
|
|
# doesn't let the user create an account without a profile picture
|
|
if not profile_file:
|
|
flash("Profile picture is required.")
|
|
return redirect(url_for("register"))
|
|
|
|
# saves the users pictures
|
|
profile_url, pictures_urls = save_files(data["username"], profile_file, pictures_files)
|
|
|
|
# creates a random string
|
|
random_string = secrets.token_hex(16)
|
|
print(random_string)
|
|
# uses the string to create the message that wll be encrypted
|
|
challenge_phrase = f"this is the unencrypted string: {random_string}"
|
|
print(challenge_phrase)
|
|
# encrypts message
|
|
fingerprint, encrypted_msg = pgp_encrypt_and_import(data["pgp"], challenge_phrase)
|
|
print(challenge_phrase)
|
|
print(encrypted_msg)
|
|
|
|
# checks fingerprint
|
|
if not fingerprint or not encrypted_msg:
|
|
flash("Invalid PGP key or encryption failed.")
|
|
return redirect(url_for("register"))
|
|
print(fingerprint)
|
|
|
|
# creates a temporary session used to verify the user
|
|
session["pending_user"] = {
|
|
**data,
|
|
"profile_url": profile_url,
|
|
"pictures_urls": pictures_urls,
|
|
"fingerprint": fingerprint
|
|
}
|
|
|
|
session['pgp_expected_phrase'] = challenge_phrase
|
|
|
|
# renders the verification page
|
|
return render_template("verify.html", encrypted_message=encrypted_msg)
|
|
|
|
return render_template("register.html", countries=COUNTRIES)
|
|
|
|
|
|
@app.route("/verify", methods=["POST"])
|
|
def verify():
|
|
# retrieve user data from the session
|
|
data = session.get("pending_user")
|
|
|
|
fingerprint = data.get("fingerprint")
|
|
|
|
# retrieve the phrase from the session
|
|
expected_phrase = session.get("pgp_expected_phrase")
|
|
print(expected_phrase)
|
|
|
|
# check to see if data exists
|
|
if not data or not expected_phrase:
|
|
flash("Session expired.")
|
|
return redirect(url_for("register"))
|
|
|
|
# get the decrypted message from form
|
|
submitted = request.form.get("decrypted_message")
|
|
|
|
# check to see if submission was empty
|
|
if not submitted:
|
|
flash("You must paste the decrypted message.")
|
|
return redirect(url_for("register"))
|
|
|
|
# checks if frase is correct
|
|
if submitted.strip() != expected_phrase:
|
|
flash("Verification failed. Account not created.")
|
|
return redirect(url_for("register"))
|
|
|
|
# saves the correcty formated date of birth
|
|
dob = date.fromisoformat(data["date_of_birth"])
|
|
|
|
|
|
# stores the data on the database
|
|
new_user = User(
|
|
username=data["username"],
|
|
pgp=fingerprint, # i store the fingerprint not the whole pgp key
|
|
firstname=data["firstname"],
|
|
lastname=data["lastname"],
|
|
sex=data["sex"],
|
|
date_of_birth=dob,
|
|
profile_picture=data["profile_url"],
|
|
pictures=data["pictures_urls"],
|
|
country=data["country"],
|
|
xmpp=data["xmpp"],
|
|
email=data.get("email") or None,
|
|
phone=data.get("phone") or None,
|
|
city=data.get("city") or None,
|
|
height=float(data["height"]) if data.get("height") else None,
|
|
weight=int(data["weight"]) if data.get("weight") else None,
|
|
race=data.get("race") or None,
|
|
prefered_age_range=data.get("prefered_age_range") or None,
|
|
is_verified=True
|
|
)
|
|
|
|
db.session.add(new_user)
|
|
db.session.commit()
|
|
|
|
# creates login session
|
|
session['user_id'] = new_user.id
|
|
session['username'] = new_user.username
|
|
|
|
# remove temporary session
|
|
session.pop("pending_user", None)
|
|
|
|
flash("PGP verification successful! Account created.")
|
|
return redirect(url_for("home"))
|
|
|
|
|
|
@app.route("/login", methods=["GET","POST"])
|
|
def login():
|
|
# Requests username and pgp
|
|
if request.method == "POST":
|
|
username = request.form.get("username")
|
|
pgp_key = request.form.get("pgp")
|
|
|
|
if not username or not pgp_key:
|
|
flash("Please enter both username and PGP key.")
|
|
return redirect(url_for("login"))
|
|
|
|
# cehcks if user exists
|
|
user = User.query.filter_by(username=username).first()
|
|
if not user:
|
|
flash("User not found.")
|
|
return redirect(url_for("login"))
|
|
|
|
# checks if imported pgp key has valid fingerprints
|
|
pgp = gpg.import_keys(pgp_key)
|
|
if not pgp.fingerprints:
|
|
flash("Invalid PGP key.")
|
|
return redirect(url_for("login"))
|
|
|
|
# retrieves fingerprint
|
|
submitted_fingerprint = pgp.fingerprints[0]
|
|
|
|
# Checks if pgp matches the user's pgp
|
|
if submitted_fingerprint != user.pgp:
|
|
flash("PGP key does not match our records.")
|
|
return redirect(url_for("login"))
|
|
|
|
# Generate a challenge for PGP verification
|
|
random_string = secrets.token_hex(16)
|
|
challenge_phrase = f"this is the unencrypted string: {random_string}"
|
|
|
|
# Encrypt the challenge phrase using the stored fingerprint
|
|
encrypted = gpg.encrypt(challenge_phrase, recipients=[submitted_fingerprint])
|
|
if not encrypted.ok:
|
|
flash("Encryption failed.")
|
|
return redirect(url_for("login"))
|
|
|
|
# Store login verification data in session (temporary)
|
|
session["login_user_id"] = user.id
|
|
session["login_expected_phrase"] = challenge_phrase
|
|
|
|
# Render page where user will paste decrypted message
|
|
return render_template("login_verify.html", encrypted_message=str(encrypted))
|
|
|
|
return render_template("login.html")
|
|
|
|
|
|
@app.route("/login_verify", methods=["POST"])
|
|
def login_verify():
|
|
# get the temporary session data
|
|
user_id = session.get("login_user_id")
|
|
expected_phrase = session.get("login_expected_phrase")
|
|
|
|
# cehcks if session exists
|
|
if not user_id or not expected_phrase:
|
|
flash("Login session expired")
|
|
return redirect(url_for("login"))
|
|
|
|
# cehcks if decrypted frase was submited
|
|
submitted = request.form.get("decrypted_message")
|
|
if not submitted:
|
|
flash("You must paste the decrypted message")
|
|
return redirect(url_for("login"))
|
|
|
|
# Checks if submited frase matches the expected
|
|
if submitted.strip() != expected_phrase:
|
|
flash("Verification failed")
|
|
return redirect(url_for("login"))
|
|
|
|
# saves session
|
|
user = User.query.get(user_id)
|
|
session['user_id'] = user.id
|
|
session['username'] = user.username
|
|
|
|
# removes temporary session
|
|
session.pop("login_user_id", None)
|
|
session.pop("login_expected_phrase", None)
|
|
|
|
flash("Logged in successfully")
|
|
return redirect(url_for("home"))
|
|
|
|
|
|
@app.route("/logout")
|
|
def logout():
|
|
# removes session
|
|
session.pop('user_id', None)
|
|
session.pop('username', None)
|
|
flash("Logged out successfully")
|
|
return redirect(url_for("home"))
|
|
|
|
# Renders users route
|
|
@app.route("/user/<username>")
|
|
def user_profile(username):
|
|
user = User.query.filter_by(username=username).first_or_404()
|
|
return render_template("user.html", user=user, date=date)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
with app.app_context():
|
|
db.create_all()
|
|
app.run(debug=True)
|