DBBigWork/ebookman/admin.py
2022-12-13 00:06:37 +08:00

165 lines
5.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from flask import (
Blueprint, flash, g, redirect, render_template, request, url_for
)
import re
import os
from werkzeug.exceptions import abort
from werkzeug.security import generate_password_hash
from ebookman.auth import admin_login_required
from ebookman.db import get_db
from datetime import datetime
import pymysql
bp = Blueprint('admin', __name__, url_prefix='/admin')
def validateEmail(email):
if re.match("^.+\\@(\\[?)[a-zA-Z0-9\\-\\.]+\\.([a-zA-Z]{2,3}|[0-9]{1,3})(\\]?)$", email) != None:
return True
else:
return False
@bp.route('/')
@admin_login_required
def index():
cur = get_db().cursor()
cur.execute(
'select * from user'
)
users = cur.fetchall()
cur.close()
return render_template("admin/index.html", users=users, cur_time=datetime.now())
@bp.route('/adduser', methods=("GET", "POST"))
@admin_login_required
def adduser():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
usermail = request.form['usermail']
userlimi = request.form['userlimit']
error = None
if not userlimi.isdecimal():
error = '用户空间必须是整数GB'
elif not validateEmail(usermail):
error = 'email格式不合法'
if error is None:
userlimi = float(userlimi)
userlimi *= 1024 * 1024 # userlimit is stored and measured by KB
userlimi = int(userlimi)
if error is None:
db = get_db()
cur = db.cursor()
try:
cur.execute(
'insert into user(`user_name`, `user_mail`, `user_passwd`, `user_limit`) values (%s,%s,%s,%s)',
(username, usermail, generate_password_hash(password), userlimi,)
)
db.commit()
except pymysql.IntegrityError as _e:
error = "用户名或邮箱已经存在 %s" % (_e)
db.rollback()
except pymysql.Error as _e:
error = "未知错误 %s" % (_e)
db.rollback()
finally:
cur.close()
if error is None:
return redirect(url_for('admin.index'))
flash(error)
return render_template("admin/adduser.html", cur_time=datetime.now())
def remove_user_doc(uid):
db = get_db()
cur = db.cursor()
cur.execute("select * from document where user_id=%s", (uid))
documents = cur.fetchall()
for document in documents:
docpath = os.path.join(os.getcwd(),document['doc_url'])
if os.path.exists(docpath):
os.remove(docpath)
error = None
try:
cur.execute("delete from document where user_id=%s", (uid))
db.commit()
except pymysql.Error as _e:
error = "严重错误:%s; 请联系管理员解决问题" % (_e)
db.rollback()
cur.close()
return error
@bp.route('/removeuser', methods=("GET",))
@admin_login_required
def removeuser():
error = None
uid_to_del = request.args.get("uid")
if uid_to_del is not None and uid_to_del.isdecimal:
db = get_db()
cur = db.cursor()
rowcnt = 0
error = remove_user_doc(uid_to_del)
if error is None:
try:
cur.callproc('clean_up_type_author', args=(uid_to_del,))
rowcnt = cur.execute("delete from user where user.user_id=%s", (uid_to_del, ))
db.commit()
except pymysql.IntegrityError as _e:
error = "用户未做好被删除的准备:%s" % (_e)
db.rollback()
except pymysql.Error as _e:
error = "删除发生未知错误: %s" %(_e)
db.rollback()
cur.close()
if error is None:
if rowcnt == 0:
error = "由于uid不存在可能没有被删除请再次检查删除结果"
else:
return redirect(url_for("admin.index"))
return render_template("admin/result.html", opname="删除用户出现意外", opresult=error, cur_time=datetime.now())
@bp.route('/updateuser/<int:uid>/', methods=("POST",))
@admin_login_required
def updateuser(uid):
error = None
db = get_db()
cur = db.cursor()
cur.execute("select * from user where user_id=%s", (uid))
userinfo = cur.fetchone()
usermail = request.form['usermail']
username = request.form['username']
if userinfo is None:
error = "%s号用户不存在!" % (uid)
if username is None or len(username) == 0:
username = None
if usermail is None or len(usermail) == 0:
usermail = None
elif not validateEmail(usermail):
error = "邮箱不合法"
if error is None:
if usermail is not None:
try:
cur.execute("update user set user_mail=%s where user_id=%s", (usermail, uid))
db.commit()
except pymysql.IntegrityError as _e:
error = "邮箱重复! %s" % (_e)
db.rollback()
except pymysql.Error as _e:
error = "未知错误: %s" % (_e)
db.rollback()
if username is not None and error is None:
try:
cur.execute("update user set user_name=%s where user_id=%s", (username, uid))
db.commit()
except pymysql.IntegrityError as _e:
error = "用户名重复! %s" % (_e)
db.rollback()
except pymysql.Error as _e:
error = "未知错误: %s" % (_e)
db.rollback()
if error is None:
return redirect(url_for("admin.index"))
return render_template("admin/result.html", opname="更新用户信息失败", opresult=error, cur_time=datetime.now())