102 lines
3.0 KiB
Python
102 lines
3.0 KiB
Python
import functools
|
|
from flask import Blueprint, flash, g, redirect, render_template, request, session, url_for
|
|
from werkzeug.security import check_password_hash
|
|
from src.db import get_db
|
|
from datetime import datetime
|
|
|
|
bp = Blueprint('auth', __name__, url_prefix='/auth')
|
|
|
|
@bp.route('/loginuser', methods=('GET', 'POST'))
|
|
def loginuser():
|
|
if request.method == 'POST':
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
db = get_db()
|
|
cur = db.cursor()
|
|
error = None
|
|
cur.execute(
|
|
'SELECT * FROM user WHERE user_name = %s', (username,)
|
|
)
|
|
user = cur.fetchone()
|
|
cur.close()
|
|
if user is None:
|
|
error = '用户名不存在'
|
|
elif not check_password_hash(user['user_passwd'], password):
|
|
error = '密码错误'
|
|
|
|
if error is None and user is not None:
|
|
session.clear()
|
|
session['user_id'] = user['user_id']
|
|
return redirect(url_for('user.home'))
|
|
|
|
flash(error)
|
|
|
|
return render_template('auth/loginuser.html', cur_time=datetime.now())
|
|
|
|
@bp.route('/loginadmin', methods=('GET', 'POST'))
|
|
def loginadmin():
|
|
if request.method == 'POST':
|
|
password = request.form['password']
|
|
db = get_db()
|
|
cur = db.cursor()
|
|
error = None
|
|
cur.execute('SELECT * FROM admin limit 1')
|
|
admin = cur.fetchone()
|
|
|
|
if admin is None:
|
|
error = '用户名不存在'
|
|
elif not check_password_hash(admin['passwd'], password):
|
|
error = '密码错误'
|
|
|
|
if error is None:
|
|
session.clear()
|
|
session['user_id'] = 0
|
|
return redirect(url_for('admin.index'))
|
|
flash(error)
|
|
return render_template('auth/loginadmin.html', cur_time=datetime.now())
|
|
|
|
@bp.route('/login', methods=('GET', 'POST'))
|
|
def login():
|
|
return render_template("auth/loginbase.html", cur_time=datetime.now())
|
|
|
|
@bp.before_app_request
|
|
def load_logged_in_user():
|
|
user_id = session.get('user_id')
|
|
|
|
if user_id is None:
|
|
g.user = None
|
|
elif user_id == 0:
|
|
g.user = {'user_id':0, 'user_name': 'admin'}
|
|
# use id=0 to refer to admin
|
|
else:
|
|
cur = get_db().cursor()
|
|
cur.execute('SELECT * FROM user WHERE user_id = %s', (user_id,))
|
|
g.user = cur.fetchone()
|
|
|
|
@bp.route('/logout')
|
|
def logout():
|
|
session.clear()
|
|
return redirect(url_for('index'))
|
|
|
|
|
|
def login_required(view):
|
|
@functools.wraps(view)
|
|
def wrapped_view(**kwargs):
|
|
if g.user is None:
|
|
return redirect(url_for('auth.login'))
|
|
elif g.user['user_id'] == 0:
|
|
return redirect(url_for('admin.index'))
|
|
return view(**kwargs)
|
|
|
|
return wrapped_view
|
|
|
|
def admin_login_required(view):
|
|
@functools.wraps(view)
|
|
def wrapped_view(**kwargs):
|
|
if g.user is None:
|
|
return redirect(url_for('auth.login'))
|
|
elif g.user['user_id'] != 0:
|
|
return redirect(url_for('user.home'))
|
|
return view(**kwargs)
|
|
|
|
return wrapped_view |