DBBigWork/ebookman/user.py
2022-12-13 00:06:37 +08:00

453 lines
17 KiB
Python

from flask import (
Blueprint, flash, g, redirect, render_template, request, url_for, send_file, current_app,
)
import os
from ebookman.auth import login_required
from ebookman.db import get_db
import pymysql
from datetime import datetime
bp = Blueprint('user', __name__)
@bp.route("/home")
@login_required
def home():
cur = get_db().cursor()
cur.execute(
"select * from user_stat where user_id = %s", (g.user['user_id'],)
)
user_stat = cur.fetchone()
print(user_stat)
cur.close()
return render_template("user/home.html", user_stat=user_stat, cur_time=datetime.now())
@bp.route("/addbook", methods=('GET', 'POST'))
@login_required
def addbook():
db = get_db()
cur = db.cursor()
error = None
if request.method == 'POST':
bookname = request.form['bookname']
bookisbn = request.form['bookisbn']
bookpublisher = request.form['bookpublisher']
booklang = request.form['booklang']
bookauthor = request.form['bookauthor']
booktype = request.form['booktype']
error = None
if len(bookname) == 0:
error = "书名不能为空"
if len(bookisbn) == 0:
bookisbn = None
if len(bookpublisher) == 0:
bookpublisher = None
if len(booklang) == 0:
booklang = None
if len(bookauthor) == 0:
booklang = None
if len(booktype) == 0:
booktype = None
cur.execute(
"insert into book (`book_name`, `book_isbn`"
", `book_publisher`, `book_lang`, `book_author`, `user_id`) "
"values (%s,%s,%s,%s,%s,%s)",
(bookname, bookisbn, bookpublisher, booklang, bookauthor, g.user['user_id'],))
db.commit()
cur.execute("select max(book_id) from book")
bookid = cur.fetchone()
bookid = bookid['max(book_id)']
error = None
if booktype is not None:
booktype = booktype.strip()
if booktype[-1] != ';':
booktype += ';'
booktypes = booktype.split(";")
booktypes.remove("")
print(booktypes)
for booktype in booktypes:
try:
cur.callproc("add_new_book_type", args=(bookid, booktype))
db.commit()
except pymysql.Error as _e:
error = "未知错误: %s" % (_e)
db.rollback()
# cur.execute("select type_id from typetable where type_name=%s", (booktype,))
# typeid = cur.fetchone()
# if typeid is None:
# cur.execute("insert into typetable(`type_name`) values (%s)", (booktype,))
# cur.execute("select type_id from typetable where type_name=%s", (booktype,))
# typeid = cur.fetchone()
# typeid = typeid['type_id']
# try:
# cur.execute("insert into book_type values(%s,%s)",(typeid, bookid))
# db.commit()
# except pymysql.Error as _e:
# error = "未知错误: %s" % (_e)
# db.rollback()
if error is None:
error = "新建图书《%s》完成" % (bookname)
# else:
# return render_template("user/addbook.html", success=True)
cur.execute("select type_name from typetable")
typelist = cur.fetchall()
return render_template("user/addbook.html", typelist=typelist, error=error,
cur_time=datetime.now())
@bp.route("/tags", methods=('GET', 'POST'))
@login_required
def tags():
db = get_db()
cur = db.cursor()
error = None
if request.method == 'POST':
typename = request.form['typename']
if typename is None or len(typename) == 0:
error = "不合法的分类名"
rowcount = 0
if error is None:
try:
rowcount = cur.execute("insert into typetable(`type_name`) values(%s)", (typename,))
db.commit()
except pymysql.IntegrityError:
error = "类型名称已经存在: %s" % (typename)
db.rollback()
except pymysql.Error as _e:
error = "未知错误: %s" % (_e)
db.rollback()
if error is None:
if rowcount == 0:
error = "新建可能失败,再次检查是否完成"
else:
error = "新建类型\"%s\"完成" % (typename)
cur.execute("select * from typetable")
typelist = cur.fetchall()
return render_template("user/addtype.html", typelist=typelist, error=error,
cur_time=datetime.now())
@bp.route("/removetype", methods=("GET",))
@login_required
def removetype():
db = get_db()
cur = db.cursor()
tid_to_del = request.args.get("tid")
error = None
if tid_to_del is None or not tid_to_del.isnumeric():
error = "Invalid argument"
rowcnt = 0
if error is None:
try:
rowcnt = cur.execute("delete from typetable where type_id=%s", (tid_to_del,))
db.commit()
except pymysql.Error as _e:
error = "删除失败:%s" % (_e)
db.rollback()
if error is None:
if rowcnt == 0:
error = "删除可能失败,再次检查是否完成"
else:
return redirect(url_for("user.tags"))
cur.close()
return render_template("user/result.html", opname="删除分类",
opresult=error, cur_time=datetime.now())
@bp.route("/search", methods=("GET", "POST"))
@login_required
def search():
attr_dict = {"书名":"book_name", "ISBN":"book_isbn",
"作者": "book_author", "出版社":"book_publisher", "分类": "specail1",
"文档名称":"special2", "评论标题": "special3"
}
db = get_db()
cur = db.cursor()
page = request.args.get('page')
page_lim = 10
page_off = 0
queryval = request.args.get('bookname')
bookattr = request.args.get('bookattr')
queryattr = None
error = None
if page is None or not page.isdecimal():
page = 1
page = int(page)
if page <= 0:
error = "无效的分页 %d" % page
else:
page_off = page_lim * (page - 1)
queryresult = []
if error is None:
if bookattr is not None and bookattr in attr_dict.keys():
queryattr = attr_dict[bookattr]
attr_dict[bookattr] = "selected"
else:
error = "不合法的查询条件: %s" % (bookattr)
attr_dict["书名"] = "selected"
bookattr=""
if error is None:
if bookattr == '文档名称':
querystring = "select * from document where user_id=%s and doc_name like \'%%%s%%\'" % (g.user['user_id'], queryval)
elif bookattr == '评论标题':
querystring = "select * from note where user_id=%s and note_name like \'%%%s%%\'" % (g.user['user_id'], queryval)
elif queryval is None or len(queryval) == 0:
querystring = "select * from book where user_id=%s" % (g.user['user_id'])
queryval = ""
elif bookattr == "分类":
querystring = "SELECT * from v_type_to_book WHERE user_id=%s AND type_name LIKE \'%%%s%%\'" % (g.user['user_id'], queryval)
else:
querystring = "select * from book where user_id=%s and %s like \'%%%s%%\'" % (g.user['user_id'], queryattr, queryval)
querystring += " limit %d offset %d" % (page_lim, page_off)
print(querystring)
cur.execute(querystring)
queryresult = cur.fetchall()
page_last = False
if len(queryresult) < page_lim:
page_last = True
next_page_link = "/search?bookname=%s&bookattr=%s&page=%s" %(queryval, bookattr, page + 1)
prev_page_link = "/search?bookname=%s&bookattr=%s&page=%s" %(queryval, bookattr, page - 1)
# print(prev_page_link, next_page_link)
return render_template("user/search.html",
cur_time=datetime.now(), error=error, queryresult=queryresult,
next_page_link=next_page_link, prev_page_link=prev_page_link,
page_last=page_last, page_first=(page==1), queryval=queryval, queryattr=attr_dict, bookattr=bookattr)
@bp.route("/book/<int:id>/", methods=("GET", "POST"))
@login_required
def book(id):
db = get_db()
cur = db.cursor()
cur.execute("select * from book where book_id=%s", (id,))
book = cur.fetchone()
cur.execute("select * from v_book_to_types where book_id=%s", (book['book_id'], ))
booktype = cur.fetchall()
cur.execute("select * from document where book_id=%s", (book['book_id'], ))
documents = cur.fetchall()
cur.execute("select * from note where book_id=%s", (book['book_id'], ))
notes = cur.fetchall()
return render_template("/user/book.html",
book=book, booktype=booktype, documents=documents, notes=notes,
cur_time=datetime.now())
@bp.route("/book/update/<int:id>/", methods=("GET", "POST"))
@login_required
def book_update(id):
db = get_db()
cur = db.cursor()
error = None
if request.method == 'POST':
bookname = request.form['bookname']
bookisbn = request.form['bookisbn']
bookpublisher = request.form['bookpublisher']
booklang = request.form['booklang']
bookauthor = request.form['bookauthor']
booktype = request.form['booktype']
error = None
if len(bookname) == 0:
error = "书名不能为空"
if len(bookisbn) == 0:
bookisbn = None
if len(bookpublisher) == 0:
bookpublisher = None
if len(booklang) == 0:
booklang = None
if len(bookauthor) == 0:
booklang = None
if len(booktype) == 0:
booktype = None
cur.execute('update book set `book_name`=%s where `book_id`=%s', (bookname, id))
cur.execute('update book set `book_isbn`=%s where `book_id`=%s', (bookisbn, id))
cur.execute('update book set `book_publisher`=%s where `book_id`=%s', (bookpublisher, id))
cur.execute('update book set `book_lang`=%s where `book_id`=%s', (booklang, id))
cur.execute('update book set `book_author`=%s where `book_id`=%s', (bookauthor, id))
db.commit()
error = None
if booktype is not None:
booktype = booktype.strip()
if booktype[-1] != ';':
booktype += ';'
booktypes = booktype.split(";")
booktypes.remove("")
print(booktypes)
for booktype in booktypes:
try:
cur.callproc("add_new_book_type", args=(id, booktype))
db.commit()
except pymysql.Error as _e:
error = "未知错误: %s" % (_e)
db.rollback()
if error is None:
cur.close()
return redirect(url_for("user.book", id=id))
cur.execute("select * from book where book_id=%s", (id,))
bookinfo = cur.fetchone()
assert(bookinfo is not None)
cur.execute("select * from v_book_to_types where book_id=%s", (bookinfo['book_id'], ))
booktype = cur.fetchall()
cur.execute("select type_name from typetable")
typelist = cur.fetchall()
cur.close()
for k in bookinfo:
if bookinfo[k] is None:
bookinfo[k] = ''
booktypestr = ""
print(booktype)
for k in booktype:
booktypestr += k['type_name'] + ';'
return render_template("/user/updatebook.html",
book=bookinfo, booktype=booktypestr, typelist=typelist,
cur_time=datetime.now())
@bp.route("/doc/upload/<int:id>/", methods=("POST",))
@login_required
def doc_upload(id):
db = get_db()
cur = db.cursor()
assert(request.method=="POST")
fileobj = request.files['file']
error = None
if fileobj and fileobj.filename:
_, filename = os.path.split(fileobj.filename)
filepath = os.path.join(current_app.config['UPLOADDIR'], filename + f'.{int(datetime.timestamp(datetime.now()))}')
fileobj.save(filepath)
filesz = os.stat(filepath).st_size // 1024 # original in bytes
filetype = fileobj.mimetype
print(filetype)
print(filesz)
try:
cur.execute(
"insert into document (`doc_name`,`doc_url`,`doc_size`,`doc_type`,`book_id`,`user_id`)"
"values (%s,%s,%s,%s,%s,%s)",
(filename, filepath, filesz, filetype, id, g.user['user_id'])
)
db.commit()
cur.execute("insert into record (`record_type`,`doc_url`,`user_id`) "
"values('upload', %s, %s)",
(filepath, g.user['user_id']))
db.commit()
except pymysql.Error as _e:
if 'ck_usedspace' in str(_e):
error = "空间不够用了,请向管理员申请扩容"
else:
error = "未知错误 %s" % (_e)
db.rollback()
else:
error = "未收到有效的文件"
cur.close()
if error is not None:
return render_template("/user/result.html",
opname="上传文件失败", opresult=error,
cur_time=datetime.now())
return redirect(url_for("user.book", id=id))
@bp.route("/doc/download/<int:docid>/", methods=("GET",))
@login_required
def doc_download(docid):
db = get_db()
cur = db.cursor()
error = None
cur.execute("select * from document where doc_id=%s", (docid,))
document = cur.fetchone()
if document is None:
error = "文件不存在"
else:
docpath = os.path.join(os.getcwd(),document['doc_url'])
if os.path.exists(docpath):
return send_file(docpath, download_name=document['doc_name'], mimetype=document['doc_type'])
else:
error = "文件已被删除,严重的数据不一致,请联系管理员"
return render_template("/user/result.html",
opname="下载文件失败", opresult=error,
cur_time=datetime.now())
@bp.route("/doc/delete/<int:docid>/", methods=("GET",))
@login_required
def doc_delete(docid):
db = get_db()
cur = db.cursor()
error = None
warning = None
cur.execute("select * from document where doc_id=%s", (docid,))
document = cur.fetchone()
if document is None:
error = "文件不存在"
else:
docpath = os.path.join(os.getcwd(),document['doc_url'])
if os.path.exists(docpath):
os.remove(docpath)
else:
warning = "文件已经删除,但是数据库记录仍存在"
if error is None:
try:
cur.execute("delete from document where `doc_id`=%s", (docid))
db.commit()
cur.execute("insert into record (`record_type`,`doc_url`,`user_id`) "
"values('remove', %s, %s)",
(document['doc_url'], g.user['user_id']))
db.commit()
except pymysql.Error as _e:
error = "严重错误:%s\n请联系管理员解决问题" % (_e)
cur.close()
if error is not None:
return render_template("/user/result.html",
opname="删除文件失败", opresult=error,
cur_time=datetime.now())
elif warning is not None:
return render_template("/user/result.html",
opname="删除文件出现了一些意外", opresult=warning,
cur_time=datetime.now(), ret_url=url_for("user.book", id=document['book_id']))
return redirect(url_for("user.book", id=document['book_id']))
@bp.route("/note/create/<int:bookid>/", methods=("POST",))
@login_required
def note_create(bookid):
error = None
db = get_db()
cur = db.cursor()
notename = request.form['notename']
notecontent = request.form["notecontent"]
if len(notename) == 0 or len(notecontent) == 0:
error = "你竟敢绕过前端验证!"
if error is None:
try:
cur.execute("insert into note (`note_name`,`note_content`,`book_id`,`user_id`) "
"values (%s, %s, %s, %s)",
(notename, notecontent, bookid, g.user['user_id']))
db.commit()
except pymysql.Error as _e:
error = "未知错误:%s" % (_e)
db.rollback()
if error is not None:
return render_template("/user/result.html",
opname="添加评论失败", opresult=error,
cur_time=datetime.now())
return redirect(url_for("user.book", id=bookid))
@bp.route("/note/delete/<int:noteid>/", methods=("GET",))
@login_required
def note_delete(noteid):
db = get_db()
cur = db.cursor()
error = None
cur.execute("select * from note where note_id=%s", (noteid,))
note = cur.fetchone()
if note is None:
error = "评论不存在"
if error is None:
try:
cur.execute("delete from note where `note_id`=%s", (noteid))
db.commit()
except pymysql.Error as _e:
error = "未知错误:%s" % (_e)
cur.close()
if error is not None:
return render_template("/user/result.html",
opname="删除评论失败", opresult=error,
cur_time=datetime.now())
return redirect(url_for("user.book", id=note['book_id']))