main.py (18163B)
1 from flask import Flask, render_template, request, redirect, url_for, flash, session 2 from flask_sqlalchemy import SQLAlchemy 3 from sqlalchemy import text 4 from datetime import date 5 import gnupg 6 import secrets 7 import os 8 from werkzeug.utils import secure_filename 9 10 BASE_DIR = os.path.dirname(os.path.abspath(__file__)) # sets the base dir as the diretory where the python file is 11 UPLOAD_FOLDER = os.path.join(BASE_DIR, "static", "uploads") # joins the directories 12 os.makedirs(UPLOAD_FOLDER, exist_ok=True) # creates the uploads directorie 13 14 # configures the app 15 app = Flask(__name__) # creates de app 16 app.config['SQLALCHEMY_DATABASE_URI'] = f"sqlite:///{os.path.join(BASE_DIR, 'lovedb.db')}" # database connection 17 app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # disable track modifications (for better performance) 18 app.config['SECRET_KEY'] = 'random' # sets the secret key used to generate random numbers 19 app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER # sets the upload folder 20 21 db = SQLAlchemy(app) # its like a shortcut to the database 22 gpg = gnupg.GPG() # same as above but for gpg 23 24 COUNTRIES = [ "Afghanistan","Albania","Algeria","Andorra","Angola","Antigua and Barbuda","Argentina", 25 "Armenia","Australia","Austria","Azerbaijan","Bahamas","Bahrain","Bangladesh", 26 "Barbados","Belarus","Belgium","Belize","Benin","Bhutan","Bolivia","Bosnia and Herzegovina", 27 "Botswana","Brazil","Brunei","Bulgaria","Burkina Faso","Burundi","Cabo Verde","Cambodia", 28 "Cameroon","Canada","Central African Republic","Chad","Chile","China","Colombia","Comoros", 29 "Congo (Congo-Brazzaville)","Costa Rica","Croatia","Cuba","Cyprus","Czechia (Czech Republic)", 30 "Democratic Republic of the Congo","Denmark","Djibouti","Dominica","Dominican Republic","Ecuador", 31 "Egypt","El Salvador","Equatorial Guinea","Eritrea","Estonia","Eswatini (fmr. Swaziland)", 32 "Ethiopia","Fiji","Finland","France","Gabon","Gambia","Georgia","Germany","Ghana","Greece", 33 "Grenada","Guatemala","Guinea","Guinea-Bissau","Guyana","Haiti","Holy See","Honduras","Hungary", 34 "Iceland","India","Indonesia","Iran","Iraq","Ireland","Israel","Italy","Jamaica","Japan","Jordan", 35 "Kazakhstan","Kenya","Kiribati","Kuwait","Kyrgyzstan","Laos","Latvia","Lebanon","Lesotho", 36 "Liberia","Libya","Liechtenstein","Lithuania","Luxembourg","Madagascar","Malawi","Malaysia", 37 "Maldives","Mali","Malta","Marshall Islands","Mauritania","Mauritius","Mexico","Micronesia", 38 "Moldova","Monaco","Mongolia","Montenegro","Morocco","Mozambique","Myanmar (Burma)","Namibia", 39 "Nauru","Nepal","Netherlands","New Zealand","Nicaragua","Niger","Nigeria","North Korea", 40 "North Macedonia","Norway","Oman","Pakistan","Palau","Palestine State","Panama","Papua New Guinea", 41 "Paraguay","Peru","Philippines","Poland","Portugal","Qatar","Romania","Russia","Rwanda", 42 "Saint Kitts and Nevis","Saint Lucia","Saint Vincent and the Grenadines","Samoa","San Marino", 43 "Sao Tome and Principe","Saudi Arabia","Senegal","Serbia","Seychelles","Sierra Leone","Singapore", 44 "Slovakia","Slovenia","Solomon Islands","Somalia","South Africa","South Korea","South Sudan", 45 "Spain","Sri Lanka","Sudan","Suriname","Sweden","Switzerland","Syria","Tajikistan","Tanzania", 46 "Thailand","Timor-Leste","Togo","Tonga","Trinidad and Tobago","Tunisia","Turkey","Turkmenistan", 47 "Tuvalu","Uganda","Ukraine","United Arab Emirates","United Kingdom","United States","Uruguay", 48 "Uzbekistan","Vanuatu","Venezuela","Vietnam","Yemen","Zambia","Zimbabwe" ] 49 50 51 # Database creation 52 class User(db.Model): 53 id = db.Column(db.Integer, primary_key=True) 54 username = db.Column(db.String(128), unique=True, nullable=False) 55 pgp = db.Column(db.String(4096), nullable=False) 56 firstname = db.Column(db.String(128), nullable=False) 57 lastname = db.Column(db.String(128), nullable=False) 58 sex = db.Column(db.Enum('male', 'female'), nullable=False) 59 date_of_birth = db.Column(db.Date, nullable=False) 60 profile_picture = db.Column(db.String(200), nullable=False) 61 pictures = db.Column(db.JSON, nullable=True) 62 country = db.Column(db.String(128), nullable=False) 63 city = db.Column(db.String(128), nullable=True) 64 height = db.Column(db.Float, nullable=True) 65 weight = db.Column(db.Integer, nullable=True) 66 race = db.Column(db.String(20), nullable=True) 67 prefered_age_range = db.Column(db.String(20), nullable=True) 68 likes = db.Column(db.JSON, nullable=True) 69 dislikes = db.Column(db.JSON, nullable=True) 70 about = db.Column(db.String(4096), nullable=True) 71 xmpp = db.Column(db.String(128), unique=True, nullable=False) 72 email = db.Column(db.String(128), unique=True, nullable=True) 73 phone = db.Column(db.String(20), unique=True, nullable=True) 74 is_verified = db.Column(db.Boolean, default=False) 75 76 # calculates user age 77 def calculate_age(dob: date) -> int: 78 today = date.today() 79 return today.year - dob.year - ((today.month, today.day) < (dob.month, dob.day)) 80 81 # saves files to the upload folder and returns their URL 82 def save_files(username: str, profile_file, pictures_files): 83 user_folder = os.path.join(app.config['UPLOAD_FOLDER'], username) 84 os.makedirs(user_folder, exist_ok=True) 85 86 profile_filename = secure_filename(profile_file.filename) 87 profile_path = os.path.join(user_folder, profile_filename) 88 profile_file.save(profile_path) 89 90 profile_url = f"/static/uploads/{username}/{profile_filename}" 91 92 pictures_urls = [] 93 for pic in pictures_files: 94 if pic.filename: 95 filename = secure_filename(pic.filename) 96 path = os.path.join(user_folder, filename) 97 pic.save(path) 98 pictures_urls.append(f"/static/uploads/{username}/{filename}") 99 100 return profile_url, pictures_urls 101 102 # encrypts the chalange for the user to then decrypt with pgp 103 def pgp_encrypt_and_import(pgp_key: str, message: str): 104 # imports the user's key 105 result = gpg.import_keys(pgp_key) 106 # check to see if the key has fingerprints 107 if not result.fingerprints: 108 return None, None 109 fingerprint = result.fingerprints[0] 110 # encrypts message to the user's fingerprint 111 encrypted = gpg.encrypt(message, recipients=[fingerprint], always_trust=True) 112 if not encrypted.ok: 113 return fingerprint, None 114 return fingerprint, str(encrypted) 115 116 # ROUTES ------------------------------------------------------------------------------------------------------ 117 118 @app.route("/") 119 def home(): 120 query = User.query 121 122 country = request.args.get("country") 123 city = request.args.get("city") 124 sex = request.args.get("sex") 125 age_min = request.args.get("age_min") 126 age_max = request.args.get("age_max") 127 race = request.args.get("race") 128 likes = request.args.get("likes") 129 dislikes = request.args.get("dislikes") 130 131 if country: 132 query = query.filter(User.country.ilike(f"%{country}%")) 133 if city: 134 query = query.filter(User.city.ilike(f"%{city}%")) 135 if sex: 136 query = query.filter(User.sex==sex) 137 if race: 138 query = query.filter(User.race.ilike(f"%{race}%")) 139 140 today = date.today() 141 if age_min: 142 try: 143 min_age = int(age_min) 144 dob_max = date(today.year - min_age, today.month, today.day) 145 query = query.filter(User.date_of_birth <= dob_max) 146 except ValueError: 147 pass 148 if age_max: 149 try: 150 max_age = int(age_max) 151 dob_min = date(today.year - max_age - 1, today.month, today.day) 152 query = query.filter(User.date_of_birth >= dob_min) 153 except ValueError: 154 pass 155 156 if likes: 157 likes_list = [x.strip().lower() for x in likes.split(",") if x.strip()] 158 users = query.all() 159 users = [u for u in users if u.likes and all(l in u.likes for l in likes_list)] 160 161 if dislikes: 162 dislikes_list = [x.strip().lower() for x in dislikes.split(",") if x.strip()] 163 for dislike in dislikes_list: 164 query = query.filter( 165 text(f"JSON_CONTAINS(dislikes, '\"{dislike}\"')") 166 ) 167 168 users = query.all() 169 return render_template("index.html", users=users, date=date) 170 171 @app.route("/register", methods=["GET", "POST"]) 172 def register(): 173 if request.method == "POST": 174 # collect data to a dictionary 175 data = {key: request.form.get(key) for key in [ 176 "username","pgp","firstname","lastname","sex","date_of_birth","country","xmpp", 177 "email","phone","city","height","weight","race" 178 ]} 179 180 min_age = request.form.get("preferred_age_min") 181 max_age = request.form.get("preferred_age_max") 182 183 if min_age and max_age: 184 try: 185 min_age = int(min_age) 186 max_age = int(max_age) 187 188 if min_age < 18 or max_age < 18: 189 flash("Minimum age is 18.") 190 return redirect(url_for("register")) 191 192 if min_age > max_age: 193 flash("Minimum age cannot be greater than maximum age.") 194 return redirect(url_for("register")) 195 196 data["prefered_age_range"] = f"{min_age}-{max_age}" 197 198 except ValueError: 199 flash("Invalid age range.") 200 return redirect(url_for("register")) 201 else: 202 data["prefered_age_range"] = None 203 204 likes_raw = request.form.get("likes", "") 205 dislikes_raw = request.form.get("dislikes", "") 206 207 data["likes"] = list(set(x.strip().lower() for x in likes_raw.split(",") if x.strip())) 208 data["dislikes"] = list(set(x.strip().lower() for x in dislikes_raw.split(",") if x.strip())) 209 210 # required fields 211 required_fields = ["username","pgp","firstname","lastname","sex","date_of_birth","country","xmpp"] 212 if not all(data[f] for f in required_fields): 213 flash("Please fill all required fields.") 214 return redirect(url_for("register")) 215 216 # check if fields are unique 217 for field in ["username","xmpp","email","phone"]: 218 if data.get(field) and User.query.filter_by(**{field:data[field]}).first(): 219 flash(f"{field.capitalize()} already exists.") 220 return redirect(url_for("register")) 221 222 # validates date format to iso (YYYY-MM-DD) 223 try: 224 dob = date.fromisoformat(data["date_of_birth"]) 225 except ValueError: 226 flash("Invalid date format.") 227 return redirect(url_for("register")) 228 229 # blocks underage users 230 if calculate_age(dob) < 18: 231 flash("You must be at least 18 years old to register.") 232 return redirect(url_for("register")) 233 234 # retrieves the user uploaded pictures 235 profile_file = request.files.get("profile_picture") 236 pictures_files = request.files.getlist("pictures") 237 238 # doesn't let the user create an account without a profile picture 239 if not profile_file: 240 flash("Profile picture is required.") 241 return redirect(url_for("register")) 242 243 # saves the users pictures 244 profile_url, pictures_urls = save_files(data["username"], profile_file, pictures_files) 245 246 # creates a random string 247 random_string = secrets.token_hex(16) 248 249 # uses the string to create the message that wll be encrypted 250 challenge_phrase = f"this is the unencrypted string: {random_string}" 251 252 # encrypts message 253 fingerprint, encrypted_msg = pgp_encrypt_and_import(data["pgp"], challenge_phrase) 254 255 256 257 # checks fingerprint 258 if not fingerprint or not encrypted_msg: 259 flash("Invalid PGP key or encryption failed.") 260 return redirect(url_for("register")) 261 print(fingerprint) 262 263 # creates a temporary session used to verify the user 264 session["pending_user"] = { 265 **data, 266 "profile_url": profile_url, 267 "pictures_urls": pictures_urls, 268 "fingerprint": fingerprint 269 } 270 271 session['pgp_expected_phrase'] = challenge_phrase 272 273 # renders the verification page 274 return render_template("verify.html", encrypted_message=encrypted_msg) 275 276 return render_template("register.html", countries=COUNTRIES) 277 278 279 @app.route("/verify", methods=["POST"]) 280 def verify(): 281 # retrieve user data from the session 282 data = session.get("pending_user") 283 284 fingerprint = data.get("fingerprint") 285 286 # retrieve the phrase from the session 287 expected_phrase = session.get("pgp_expected_phrase") 288 289 290 # check to see if data exists 291 if not data or not expected_phrase: 292 flash("Session expired.") 293 return redirect(url_for("register")) 294 295 # get the decrypted message from form 296 submitted = request.form.get("decrypted_message") 297 298 # check to see if submission was empty 299 if not submitted: 300 flash("You must paste the decrypted message.") 301 return redirect(url_for("register")) 302 303 # checks if frase is correct 304 if submitted.strip() != expected_phrase: 305 flash("Verification failed. Account not created.") 306 return redirect(url_for("register")) 307 308 # saves the correcty formated date of birth 309 dob = date.fromisoformat(data["date_of_birth"]) 310 311 # stores the data on the database 312 new_user = User( 313 username=data["username"], 314 pgp=fingerprint, # i store the fingerprint not the whole pgp key 315 firstname=data["firstname"], 316 lastname=data["lastname"], 317 sex=data["sex"], 318 date_of_birth=dob, 319 profile_picture=data["profile_url"], 320 pictures=data["pictures_urls"], 321 country=data["country"], 322 xmpp=data["xmpp"], 323 email=data.get("email") or None, 324 phone=data.get("phone") or None, 325 city=data.get("city") or None, 326 height=float(data["height"]) if data.get("height") else None, 327 weight=int(data["weight"]) if data.get("weight") else None, 328 race=data.get("race") or None, 329 likes=data.get("likes") or [], 330 dislikes=data.get("dislikes") or [], 331 prefered_age_range=data.get("prefered_age_range") or None, 332 is_verified=True 333 ) 334 335 db.session.add(new_user) 336 db.session.commit() 337 338 # creates login session 339 session['user_id'] = new_user.id 340 session['username'] = new_user.username 341 342 # remove temporary session 343 session.pop("pending_user", None) 344 345 flash("PGP verification successful! Account created.") 346 return redirect(url_for("home")) 347 348 349 @app.route("/login", methods=["GET","POST"]) 350 def login(): 351 # Requests username and pgp 352 if request.method == "POST": 353 username = request.form.get("username") 354 pgp_key = request.form.get("pgp") 355 356 if not username or not pgp_key: 357 flash("Please enter both username and PGP key.") 358 return redirect(url_for("login")) 359 360 # cehcks if user exists 361 user = User.query.filter_by(username=username).first() 362 if not user: 363 flash("User not found.") 364 return redirect(url_for("login")) 365 366 # checks if imported pgp key has valid fingerprints 367 pgp = gpg.import_keys(pgp_key) 368 if not pgp.fingerprints: 369 flash("Invalid PGP key.") 370 return redirect(url_for("login")) 371 372 # retrieves fingerprint 373 submitted_fingerprint = pgp.fingerprints[0] 374 375 # Checks if pgp matches the user's pgp 376 if submitted_fingerprint != user.pgp: 377 flash("PGP key does not match our records.") 378 return redirect(url_for("login")) 379 380 # Generate a challenge for PGP verification 381 random_string = secrets.token_hex(16) 382 challenge_phrase = f"this is the unencrypted string: {random_string}" 383 384 # Encrypt the challenge phrase using the stored fingerprint 385 encrypted = gpg.encrypt(challenge_phrase, recipients=[submitted_fingerprint]) 386 if not encrypted.ok: 387 flash("Encryption failed.") 388 return redirect(url_for("login")) 389 390 # Store login verification data in session (temporary) 391 session["login_user_id"] = user.id 392 session["login_expected_phrase"] = challenge_phrase 393 394 # Render page where user will paste decrypted message 395 return render_template("login_verify.html", encrypted_message=str(encrypted)) 396 397 return render_template("login.html") 398 399 400 @app.route("/login_verify", methods=["POST"]) 401 def login_verify(): 402 # get the temporary session data 403 user_id = session.get("login_user_id") 404 expected_phrase = session.get("login_expected_phrase") 405 406 # cehcks if session exists 407 if not user_id or not expected_phrase: 408 flash("Login session expired") 409 return redirect(url_for("login")) 410 411 # cehcks if decrypted frase was submited 412 submitted = request.form.get("decrypted_message") 413 if not submitted: 414 flash("You must paste the decrypted message") 415 return redirect(url_for("login")) 416 417 # Checks if submited frase matches the expected 418 if submitted.strip() != expected_phrase: 419 flash("Verification failed") 420 return redirect(url_for("login")) 421 422 # saves session 423 user = User.query.get(user_id) 424 session['user_id'] = user.id 425 session['username'] = user.username 426 427 # removes temporary session 428 session.pop("login_user_id", None) 429 session.pop("login_expected_phrase", None) 430 431 flash("Logged in successfully") 432 return redirect(url_for("home")) 433 434 435 @app.route("/logout") 436 def logout(): 437 # removes session 438 session.pop('user_id', None) 439 session.pop('username', None) 440 flash("Logged out successfully") 441 return redirect(url_for("home")) 442 443 # Renders users route 444 @app.route("/user/<username>") 445 def user_profile(username): 446 user = User.query.filter_by(username=username).first_or_404() 447 return render_template("user.html", user=user, date=date) 448 449 450 if __name__ == "__main__": 451 with app.app_context(): 452 db.create_all() 453 app.run(debug=True)