#!/bin/bash/env python3
from flask import Flask, url_for, render_template, redirect, flash, request
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager,UserMixin,login_user,login_required, logout_user, current_user
from werkzeug.security import generate_password_hash, check_password_hash
from markupsafe import escape
import os
import sys
import click
app = Flask(__name__)
app.config['SECRET_KEY'] = 'dev' # 等同于 app.secret_key = 'dev'
login_manager = LoginManager(app) # 实例化扩展类
login_manager.login_view = 'login'
WIN = sys.platform.startswith('win')
if WIN: # 如果是 Windows 系统,使用三个斜线
prefix = 'sqlite:///'
else: # 否则使用四个斜线
prefix = 'sqlite:////'
app.config['SQLALCHEMY_DATABASE_URI'] = prefix + os.path.join(app.root_path, 'data.db')
# app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # 关闭对模型修改的监控
db = SQLAlchemy(app) # 初始化扩展,传入程序实例 app
# @app.route('/')
# def hello():
# return 'Hello'
# @app.route('/user/<name>')
# def user_page(name):
# return f'User: {escape(name)}'
# @app.route('/test')
# def test_url_for():
# # 下面是一些调用示例(请访问 http://localhost:5000/test 后在命令行窗口查看输出的 URL):
# print(url_for('hello')) # 生成 hello 视图函数对应的 URL,将会输出:/
# # 注意下面两个调用是如何生成包含 URL 变量的 URL 的
# print(url_for('user_page', name='greyli')) # 输出:/user/greyli
# print(url_for('user_page', name='peter')) # 输出:/user/peter
# print(url_for('test_url_for')) # 输出:/test
# # 下面这个调用传入了多余的关键字参数,它们会被作为查询字符串附加到 URL 后面。
# print(url_for('test_url_for', num=2)) # 输出:/test?num=2
# return 'Test page'
# name = 'Grey Li'
# movies = [
# {'title': 'My Neighbor Totoro', 'year': '1988'},
# {'title': 'Dead Poets Society', 'year': '1989'},
# {'title': 'A Perfect World', 'year': '1993'},
# {'title': 'Leon', 'year': '1994'},
# {'title': 'Mahjong', 'year': '1996'},
# {'title': 'Swallowtail Butterfly', 'year': '1996'},
# {'title': 'King of Comedy', 'year': '1999'},
# {'title': 'Devils on the Doorstep', 'year': '1999'},
# {'title': 'WALL-E', 'year': '2008'},
# {'title': 'The Pork of Music', 'year': '2012'},
# ]
# @app.route('/')
# def index():
# return render_template('index.html', name=name, movies=movies)
class Movie(db.Model):
id = db.Column(db.Integer, primary_key=True) # 主键
title = db.Column(db.String(60)) #电影标题
year = db.Column(db.String(4)) #电影年份
class User(db.Model, UserMixin):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(20))
username = db.Column(db.String(20)) # 用户名
password_hash = db.Column(db.String(128)) # 密码散列值
def set_password(self, password): # 用来设置密码的方法,接受密码作为参数
self.password_hash = generate_password_hash(password) # 将生成的密码保持到对应字段
def validate_password(self, password): # 用于验证密码的方法,接受密码作为参数
return check_password_hash(self.password_hash, password) # 返回布尔值
@app.cli.command() # 注册为命令,可以传入 name 参数来自定义命令
@click.option('--drop', is_flag=True, help='Create after drop') # 设置选项
def initdb(drop):
"""Initialize the databases."""
if drop: #判断是否输入了选项
db.drop_all()
db.create_all()
click.echo('Initalize databases.') # 输出提示信息
@app.cli.command()
def forge():
"""Generate fake data."""
db.create_all()
# 全局的两个变量移动到这个函数内
name = 'Grey Li'
movies = [
{'title': 'My Neighbor Totoro', 'year': '1988'},
{'title': 'Dead Poets Society', 'year': '1989'},
{'title': 'A Perfect World', 'year': '1993'},
{'title': 'Leon', 'year': '1994'},
{'title': 'Mahjong', 'year': '1996'},
{'title': 'Swallowtail Butterfly', 'year': '1996'},
{'title': 'King of Comedy', 'year': '1999'},
{'title': 'Devils on the Doorstep', 'year': '1999'},
{'title': 'WALL-E', 'year': '2008'},
{'title': 'The Pork of Music', 'year': '2012'},
]
user = User(name=name)
db.session.add(user)
for m in movies:
movie = Movie(title=m['title'], year=m['year'])
db.session.add(movie)
db.session.commit()
click.echo('Done.')
@app.cli.command()
@click.option('--username', prompt=True, help='The username used to login.')
@click.option('--password', prompt=True, hide_input=True, confirmation_prompt=True, help='The password used to login.')
def admin(username, password):
"""Create user."""
db.create_all()
user = User.query.first()
if user is not None:
click.echo('Updating user...')
user.username = username
user.set_password(password) # 设置密码
else:
click.echo('Creating user...')
user = User(username=username, name='Admin')
user.set_password(password) # 设置密码
db.session.add(user)
db.session.commit() # 提交数据库会话
click.echo('Done.')
@app.context_processor
def inject_user(): # 函数名可以随意修改
user = User.query.first()
return dict(user=user) # 需要返回字典,等同于 return {'user': user}
@app.errorhandler(404) # 传入要处理的错误代码
def page_not_found(e): # 接受异常对象作为参数
user = User.query.first()
return render_template('404.html'), 404 # 返回模板和状态码
@app.route('/', methods=['GET', 'POST'])
def index():
if request.method == 'POST': # 判断是否是 POST 请求
if not current_user.is_authenticated: # 如果当前用户未认证
return redirect(url_for('index')) # 重定向到主页
# 获取表单数据
title = request.form.get('title') # 传入表单对应输入字段的 name 值
year = request.form.get('year')
# 验证数据
if not title or not year or len(year) != 4 or len(title) > 60:
flash('Invalid input.') # 显示错误提示
return redirect(url_for('index')) # 重定向回主页
# 保存表单数据到数据库
movie = Movie(title=title, year=year) # 创建记录
db.session.add(movie) # 添加到数据库会话
db.session.commit() # 提交数据库会话
flash('Item created.') # 显示成功创建的提示
return redirect(url_for('index')) # 重定向回主页
movies = Movie.query.all()
return render_template('index.html', movies=movies)
user = User.query.first() # 读取用户记录
movies = Movie.query.all() # 读取所有电影记录
return render_template('index.html', user=user, movies=movies)
@app.route('/movie/edit/<int:movie_id>', methods=['GET', 'POST'])
@login_required
def edit(movie_id):
movie = Movie.query.get_or_404(movie_id)
if request.method == 'POST': # 处理编辑表单的提交请求
title = request.form['title']
year = request.form['year']
if not title or not year or len(year) != 4 or len(title) > 60:
flash('Invalid input.')
return redirect(url_for('edit', movie_id=movie_id)) # 重定向回对应的编辑页面
movie.title = title # 更新标题
movie.year = year # 更新年份
db.session.commit() # 提交数据库会话
flash('Item updated.')
return redirect(url_for('index')) # 重定向回主页
return render_template('edit.html', movie=movie) # 传入被编辑的电影记录
@app.route('/movie/delete/<int:movie_id>', methods=['POST']) # 限定只接受 POST 请求
@login_required
def delete(movie_id):
movie = Movie.query.get_or_404(movie_id) # 获取电影记录
db.session.delete(movie) # 删除对应的记录
db.session.commit() # 提交数据库会话
flash('Item deleted.')
return redirect(url_for('index')) # 重定向回主页
@login_manager.user_loader
def load_user(user_id): # 创建用户加载回调函数,接受用户 ID 作为参数
user = User.query.get(int(user_id)) # 用 ID 作为 User 模型的主键查询对应的用户
return user # 返回用户对象
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
if not username or not password:
flash('Invalid input.')
return redirect(url_for('login'))
user = User.query.first()
# 验证用户名和密码是否一致
if username == user.username and user.validate_password(password):
login_user(user) # 登入用户
flash('Login success.')
return redirect(url_for('index')) # 重定向到主页
flash('Invalid username or password.') # 如果验证失败,显示错误消息
return redirect(url_for('login')) # 重定向回登录页面
return render_template('login.html')
@app.route('/logout')
@login_required # 用于视图保护,后面会详细介绍
def logout():
logout_user() # 登出用户
flash('Goodbye.')
return redirect(url_for('index')) # 重定向回首页
@app.route('/settings', methods=['GET', 'POST'])
@login_required
def settings():
if request.method == 'POST':
name = request.form['name']
if not name or len(name) > 20:
flash('Invalid input.')
return redirect(url_for('settings'))
current_user.name = name
# current_user 会返回当前登录用户的数据库记录对象
# 等同于下面的用法
# user = User.query.first()
# user.name = name
db.session.commit()
flash('Settings updated.')
return redirect(url_for('index'))
return render_template('settings.html')
PS D:\python-script\flask-learning> & "D:/Program Files/Python3.10/python.exe" d:/python-script/flask-learning/test_watchlist.py
EE
======================================================================
ERROR: test_app_exist (__main__.WatchlistTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
File "d:\python-script\flask-learning\test_watchlist.py", line 15, in setUp
db.create_all()
File "D:\Program Files\Python3.10\Lib\site-packages\flask_sqlalchemy\extension.py", line 887, in create_all
self._call_for_binds(bind_key, "create_all")
File "D:\Program Files\Python3.10\Lib\site-packages\flask_sqlalchemy\extension.py", line 858, in _call_for_binds
engine = self.engines[key]
File "D:\Program Files\Python3.10\Lib\site-packages\flask_sqlalchemy\extension.py", line 639, in engines
app = current_app._get_current_object() # type: ignore[attr-defined]
File "D:\Program Files\Python3.10\Lib\site-packages\werkzeug\local.py", line 508, in _get_current_object
raise RuntimeError(unbound_message) from None
RuntimeError: Working outside of application context.
This typically means that you attempted to use functionality that needed
the current application. To solve this, set up an application context
with app.app_context(). See the documentation for more information.
** 看报错似乎为上下文的异常,我修改后的代码运行,可以通过测试,但是存在的问题是app.config.update里面的配置未生效,使用的还是原文件的SQLALCHEMY_DATABASE_URI配置,导致会覆盖开发环境的数据