Python 网站开发(8) -- 构建API
编写API
API就是把Web App的功能全部封装了,所以,通过API操作数据,可以极大地把前端和后端的代码隔离,使得后端代码易于测试,前端代码编写更简单。一个API也是一个URL的处理函数,我们希望能直接通过一个@api
来把函数变成JSON格式的REST API,这样,获取注册用户可以用一个API实现如下:
@get('/api/users')
def api_get_users(*, page='1'):
page_index = get_page_index(page)
num = await User.findNumber('count(id)')
p = Page(num, page_index)
if num == 0:
return dict(page=p, users=())
users = await User.findAll(orderBy='created_at desc', limit=(p.offset, p.limit))
for u in users:
u.passwd = '******'
return dict(page=p, users=users)
只要返回一个dict
,后续的response
这个middleware
就可以把结果序列化为JSON并返回。
以下是所有网站所需要的后端API, 前端页面URL的列表:
后端API包括:
- 获取日志:GET /api/blogs
- 创建日志:POST /api/blogs
- 修改日志:POST /api/blogs/:blog_id
- 删除日志:POST /api/blogs/:blog_id/delete
- 获取评论:GET /api/comments
- 创建评论:POST /api/blogs/:blog_id/comments
- 删除评论:POST /api/comments/:comment_id/delete
- 创建新用户:POST /api/users
- 获取用户:GET /api/users
管理页面包括:
- 评论列表页:GET /manage/comments
- 日志列表页:GET /manage/blogs
- 创建日志页:GET /manage/blogs/create
- 修改日志页:GET /manage/blogs/
- 用户列表页:GET /manage/users
用户浏览页面包括:
- 注册页:GET /register
- 登录页:GET /signin
- 注销页:GET /signout
- 首页:GET /
- 日志详情页:GET /blog/:blog_id
我们将处理这些API和URL的函数统一放在handlers.py
中:
import re, time, json, logging, hashlib, base64, asyncio
## markdown 是处理日志文本的一种格式语法,具体语法使用请百度
import markdown
from aiohttp import web
from coroweb import get, post
## 分页管理以及调取API时的错误信息
from apis import Page, APIValueError, APIResourceNotFoundError
from models import User, Comment, Blog, next_id
from config import configs
COOKIE_NAME = 'awesession'
_COOKIE_KEY = configs.session.secret
## 查看是否是管理员用户
def check_admin(request):
if request.__user__ is None or not request.__user__.admin:
raise APIPermissionError()
## 获取页码信息
def get_page_index(page_str):
p = 1
try:
p = int(page_str)
except ValueError as e:
pass
if p < 1:
p = 1
return p
## 计算加密cookie
def user2cookie(user, max_age):
# build cookie string by: id-expires-sha1
expires = str(int(time.time() + max_age))
s = '%s-%s-%s-%s' % (user.id, user.passwd, expires, _COOKIE_KEY)
L = [user.id, expires, hashlib.sha1(s.encode('utf-8')).hexdigest()]
return '-'.join(L)
## 文本转HTML
def text2html(text):
lines = map(lambda s: '<p>%s</p>' % s.replace('&', '&').replace('<', '<').replace('>', '>'), filter(lambda s: s.strip() != '', text.split('\n')))
return ''.join(lines)
## 解密cookie
async def cookie2user(cookie_str):
if not cookie_str:
return None
try:
L = cookie_str.split('-')
if len(L) != 3:
return None
uid, expires, sha1 = L
if int(expires) < time.time():
return None
user = await User.find(uid)
if user is None:
return None
s = '%s-%s-%s-%s' % (uid, user.passwd, expires, _COOKIE_KEY)
if sha1 != hashlib.sha1(s.encode('utf-8')).hexdigest():
logging.info('invalid sha1')
return None
user.passwd = '******'
return user
except Exception as e:
logging.exception(e)
return None
## 处理首页URL
@get('/')
async def index(*, page='1'):
page_index = get_page_index(page)
num = await Blog.findNumber('count(id)')
p = Page(num, page_index)
if num == 0:
blogs = []
else:
blogs = await Blog.findAll(orderBy='created_at desc', limit=(p.offset, p.limit))
return {
'__template__': 'blogs.html',
'page': p,
'blogs': blogs
}
## 处理日志详情页面URL
@get('/blog/{id}')
async def get_blog(id):
blog = await Blog.find(id)
comments = await Comment.findAll('blog_id=?', [id], orderBy='created_at desc')
for c in comments:
c.html_content = markdown.markdown(c.content)
blog.html_content = markdown.markdown(blog.content)
return {
'__template__': 'blog.html',
'blog': blog,
'comments': comments
}
## 处理注册页面URL
@get('/register')
def register():
return {
'__template__': 'register.html'
}
## 处理登录页面URL
@get('/signin')
def signin():
return {
'__template__': 'signin.html'
}
## 用户登录验证API
@post('/api/authenticate')
async def authenticate(*, email, passwd):
if not email:
raise APIValueError('email', 'Invalid email.')
if not passwd:
raise APIValueError('passwd', 'Invalid password.')
users = await User.findAll('email=?', [email])
if len(users) == 0:
raise APIValueError('email', 'Email not exist.')
user = users[0]
# check passwd:
sha1 = hashlib.sha1()
sha1.update(user.id.encode('utf-8'))
sha1.update(b':')
sha1.update(passwd.encode('utf-8'))
if user.passwd != sha1.hexdigest():
raise APIValueError('passwd', 'Invalid password.')
# authenticate ok, set cookie:
r = web.Response()
r.set_cookie(COOKIE_NAME, user2cookie(user, 86400), max_age=86400, httponly=True)
user.passwd = '******'
r.content_type = 'application/json'
r.body = json.dumps(user, ensure_ascii=False).encode('utf-8')
return r
## 用户注销
@get('/signout')
def signout(request):
referer = request.headers.get('Referer')
r = web.HTTPFound(referer or '/')
r.set_cookie(COOKIE_NAME, '-deleted-', max_age=0, httponly=True)
logging.info('user signed out.')
return r
## 获取管理页面
@get('/manage/')
def manage():
return 'redirect:/manage/comments'
## 评论管理页面
@get('/manage/comments')
def manage_comments(*, page='1'):
return {
'__template__': 'manage_comments.html',
'page_index': get_page_index(page)
}
## 日志管理页面
@get('/manage/blogs')
def manage_blogs(*, page='1'):
return {
'__template__': 'manage_blogs.html',
'page_index': get_page_index(page)
}
## 创建日志页面
@get('/manage/blogs/create')
def manage_create_blog():
return {
'__template__': 'manage_blog_edit.html',
'id': '',
'action': '/api/blogs'
}
## 编辑日志页面
@get('/manage/blogs/edit')
def manage_edit_blog(*, id):
return {
'__template__': 'manage_blog_edit.html',
'id': id,
'action': '/api/blogs/%s' % id
}
## 用户管理页面
@get('/manage/users')
def manage_users(*, page='1'):
return {
'__template__': 'manage_users.html',
'page_index': get_page_index(page)
}
## 获取评论信息API
@get('/api/comments')
async def api_comments(*, page='1'):
page_index = get_page_index(page)
num = await Comment.findNumber('count(id)')
p = Page(num, page_index)
if num == 0:
return dict(page=p, comments=())
comments = await Comment.findAll(orderBy='created_at desc', limit=(p.offset, p.limit))
return dict(page=p, comments=comments)
## 用户发表评论API
@post('/api/blogs/{id}/comments')
async def api_create_comment(id, request, *, content):
user = request.__user__
if user is None:
raise APIPermissionError('Please signin first.')
if not content or not content.strip():
raise APIValueError('content')
blog = await Blog.find(id)
if blog is None:
raise APIResourceNotFoundError('Blog')
comment = Comment(blog_id=blog.id, user_id=user.id, user_name=user.name, user_image=user.image, content=content.strip())
await comment.save()
return comment
## 管理员删除评论API
@post('/api/comments/{id}/delete')
async def api_delete_comments(id, request):
check_admin(request)
c = await Comment.find(id)
if c is None:
raise APIResourceNotFoundError('Comment')
await c.remove()
return dict(id=id)
## 获取用户信息API
@get('/api/users')
async def api_get_users(*, page='1'):
page_index = get_page_index(page)
num = await User.findNumber('count(id)')
p = Page(num, page_index)
if num == 0:
return dict(page=p, users=())
users = await User.findAll(orderBy='created_at desc', limit=(p.offset, p.limit))
for u in users:
u.passwd = '******'
return dict(page=p, users=users)
## 定义EMAIL和HASH的格式规范
_RE_EMAIL = re.compile(r'^[a-z0-9\.\-\_]+\@[a-z0-9\-\_]+(\.[a-z0-9\-\_]+){1,4}$')
_RE_SHA1 = re.compile(r'^[0-9a-f]{40}$')
## 用户注册API
@post('/api/users')
async def api_register_user(*, email, name, passwd):
if not name or not name.strip():
raise APIValueError('name')
if not email or not _RE_EMAIL.match(email):
raise APIValueError('email')
if not passwd or not _RE_SHA1.match(passwd):
raise APIValueError('passwd')
users = await User.findAll('email=?', [email])
if len(users) > 0:
raise APIError('register:failed', 'email', 'Email is already in use.')
uid = next_id()
sha1_passwd = '%s:%s' % (uid, passwd)
user = User(id=uid, name=name.strip(), email=email, passwd=hashlib.sha1(sha1_passwd.encode('utf-8')).hexdigest(), image='http://www.gravatar.com/avatar/%s?d=mm&s=120' % hashlib.md5(email.encode('utf-8')).hexdigest())
await user.save()
# make session cookie:
r = web.Response()
r.set_cookie(COOKIE_NAME, user2cookie(user, 86400), max_age=86400, httponly=True)
user.passwd = '******'
r.content_type = 'application/json'
r.body = json.dumps(user, ensure_ascii=False).encode('utf-8')
return r
## 获取日志列表API
@get('/api/blogs')
async def api_blogs(*, page='1'):
page_index = get_page_index(page)
num = await Blog.findNumber('count(id)')
p = Page(num, page_index)
if num == 0:
return dict(page=p, blogs=())
blogs = await Blog.findAll(orderBy='created_at desc', limit=(p.offset, p.limit))
return dict(page=p, blogs=blogs)
## 获取日志详情API
@get('/api/blogs/{id}')
async def api_get_blog(*, id):
blog = await Blog.find(id)
return blog
## 发表日志API
@post('/api/blogs')
async def api_create_blog(request, *, name, summary, content):
check_admin(request)
if not name or not name.strip():
raise APIValueError('name', 'name cannot be empty.')
if not summary or not summary.strip():
raise APIValueError('summary', 'summary cannot be empty.')
if not content or not content.strip():
raise APIValueError('content', 'content cannot be empty.')
blog = Blog(user_id=request.__user__.id, user_name=request.__user__.name, user_image=request.__user__.image, name=name.strip(), summary=summary.strip(), content=content.strip())
await blog.save()
return blog
## 编辑日志API
@post('/api/blogs/{id}')
async def api_update_blog(id, request, *, name, summary, content):
check_admin(request)
blog = await Blog.find(id)
if not name or not name.strip():
raise APIValueError('name', 'name cannot be empty.')
if not summary or not summary.strip():
raise APIValueError('summary', 'summary cannot be empty.')
if not content or not content.strip():
raise APIValueError('content', 'content cannot be empty.')
blog.name = name.strip()
blog.summary = summary.strip()
blog.content = content.strip()
await blog.update()
return blog
## 删除日志API
@post('/api/blogs/{id}/delete')
async def api_delete_blog(request, *, id):
check_admin(request)
blog = await Blog.find(id)
await blog.remove()
return dict(id=id)
## 删除用户API
@post('/api/users/{id}/delete')
async def api_delete_users(id, request):
check_admin(request)
id_buff = id
user = await User.find(id)
if user is None:
raise APIResourceNotFoundError('Comment')
await user.remove()
# 给被删除的用户在评论中标记
comments = await Comment.findAll('user_id=?',[id])
if comments:
for comment in comments:
id = comment.id
c = await Comment.find(id)
c.user_name = c.user_name + ' (该用户已被删除)'
await c.update()
id = id_buff
return dict(id=id)