Browse Source

[u] 接口实现

master
luoning 8 years ago
parent
commit
dc6a39614e
  1. 4
      doc/README.md
  2. 100
      src/lottery.py
  3. 58
      src/main.py
  4. 20
      src/random_selector.py
  5. 127
      src/test_lottery.py
  6. 6
      src/users_db.py

4
doc/README.md

@ -2,8 +2,8 @@ @@ -2,8 +2,8 @@
| 接口名 | 请求方法 | 参数 | 返回值 | 备注 |
| --------------- | -------- | ---------------------------------------- | ---------------------------------------- | ---------------------------------- |
| /index | GET | | | |
| /run | GET | award_id=int | {"status": 200, "msg": "success", "data": None} | 开始抽奖,进入抽奖状态 |
| /draw_lottery | GET | | {"status": 200, "msg": "success", "data": [{"uid":uid, "name":name, "award_id":award_id, "role":role}]} | 获取抽奖结果,仅当抽奖状态才可调用此接口 |
| /run | GET | | {"status": 200, "msg": "success", "data": None} | 开始抽奖,进入抽奖状态 |
| /draw_lottery | GET | award_id=int | {"status": 200, "msg": "success", "data": [{"uid":uid, "name":name, "award_id":award_id, "role":role}]} | 获取抽奖结果,仅当抽奖状态才可调用此接口 |
| ~~/update~~ | ~~POST~~ | ~~[{"uid":string, "awards_id":int}]~~ | | ~~提交中奖用户~~ |
| /revoke | GET | uid=string | {"status": 200, "msg": "success", "data": None} | 取消用户中奖信息,uid为工号,例如G0001 |
| /lucky_users | GET | award_id=int | {"status": 200, "msg": "success", "data": {award_id:[{"uid":uid, "name":name, "role":role}]}} | 获取所有中奖用户列表,传入award_id获取指定奖项的中奖用户列表 |

100
src/lottery.py

@ -2,23 +2,26 @@ @@ -2,23 +2,26 @@
# -*- coding:utf-8 -*-
import tinydb as d
import os
from . import CURRENT_DIR
import time
from . import CURRENT_DIR, random_selector, users_db
db = d.TinyDB(os.path.join(CURRENT_DIR, "lottery.db"))
t_users = db.table('users')
t_awards = db.table('awards')
def current_ms():
return int(round(time.time() * 1000))
def server_resp(status, msg, data=None):
return {"status": status, "msg": msg, "data": data}
def reset(users_db, awards_db):
def reset():
db.purge_tables()
for u in users_db:
t_users.insert(u)
for a in awards_db:
t_awards.insert(a)
add_users(users_db.load())
return server_resp(200, "success")
@ -31,14 +34,18 @@ def luckless_users(): @@ -31,14 +34,18 @@ def luckless_users():
t_users.clear_cache()
user = d.Query()
return server_resp(200, "success", t_users.search(
(~user.award_id.exists()) | (user.award_id is None)))
user['award_id'] == None))
def lucky_users():
def lucky_users(award_id=None):
t_users.clear_cache()
user = d.Query()
if award_id is None:
return server_resp(200, "success", t_users.search(
user.award_id != None))
else:
return server_resp(200, "success", t_users.search(
user.award_id is not None))
user.award_id == award_id))
def awards():
@ -46,12 +53,79 @@ def awards(): @@ -46,12 +53,79 @@ def awards():
return server_resp(200, "success", t_awards.all())
def update(uid, award_id):
t_users.clear_cache()
def award(id):
t_awards.clear_cache()
award = t_awards.get(d.where('award_id') == id)
return server_resp(200, "success", award) if award else server_resp(500, "不存在此奖品")
def update(uid, award_id):
# t_users.clear_cache()
# t_awards.clear_cache()
if not t_awards.get(d.Query().award_id == award_id):
return server_resp(400, "不存在此奖品")
return server_resp(500, "不存在此奖品")
if len(t_users.update({'award_id': award_id}, d.Query().uid == uid)) == 0:
return server_resp(401, "不存在此用户")
return server_resp(501, "不存在此用户")
else:
return server_resp(200, "success")
def revoke(uid):
t_users.clear_cache()
t_awards.clear_cache()
user = t_users.get(d.where('uid') == uid)
if not user:
return server_resp(501, "不存在此用户")
if user['award_id']:
# award = t_awards.get(d.where('award_id') == user['award_id'])
# if award:
# t_awards.update({'award_size': max(award['award_size']-1, 0)}, doc_ids=[award.doc_id])
t_users.update({'award_id': None}, doc_ids=[user.doc_id])
return server_resp(200, "success")
def add_awards(awards):
t_awards.clear_cache()
for id, award in enumerate(awards):
t_awards.insert({'award_id': id,
'award_name': award['award_name'],
'award_size': award['award_capacity'],
'award_capacity': award['award_capacity']})
return server_resp(200, "success")
def add_users(users):
t_users.clear_cache()
for user in users:
t_users.insert({'uid': user['uid'].upper(),
'name': user['name'],
'award_id': None,
'role': user['role']})
def run():
global start_time
start_time = current_ms()
return server_resp(200, "success")
def draw_lottery(award_id):
t_awards.clear_cache()
if not start_time:
return server_resp(402, "尚未开始抽奖")
award = t_awards.get(d.where('award_id') == award_id)
if not award:
return server_resp(500, "不存在此奖项")
t_users.clear_cache()
user = d.Query()
luckies = random_selector.randomselect(current_ms() - start_time, t_users.search(
user['award_id'] == None), award['award_capacity'])
for lucky in luckies:
update(lucky['uid'], award_id)
lucky.update({'award_id': award_id})
return server_resp(200, "success", luckies)

58
src/main.py

@ -2,7 +2,7 @@ @@ -2,7 +2,7 @@
# -*- coding:utf-8 -*-
import bottle as b
import os
from . import CURRENT_DIR
from . import CURRENT_DIR, lottery
app = b.Bottle()
@ -14,40 +14,69 @@ def index(): @@ -14,40 +14,69 @@ def index():
@app.route('/update', method='POST')
def update():
b.abort(403, "禁止修改中奖用户名单")
b.abort(403)
@app.route('/revoke', method='POST')
@app.route('/revoke')
def revoke():
pass
uid = b.request.params.get('uid')
if not uid:
b.abort(403)
return lottery.revoke(id)
@app.route('/users')
def users():
pass
return lottery.users()
@app.route('/lucky_users')
def lucky_users():
if b.request.method == 'POST':
pass
else:
pass
award_id = b.request.params.get('award_id')
return lottery.lucky_users(award_id)
@app.route('/luckless_users')
def luckless_users():
pass
return lottery.luckless_users()
@app.route('/awards')
def awards():
pass
return lottery.awards()
@app.route('/award')
def award():
award_id = b.request.params.get('award_id')
if award_id is None:
b.abort(403)
if type(award_id) is not int:
award_id = int(award_id)
return lottery.award(award_id)
@app.route('/add_awards', method='POST')
def add_awards():
return lottery.add_awards(b.request.json)
@app.route('/reset')
def reset():
pass
return lottery.reset()
@app.route('/run')
def run():
return lottery.run()
@app.route('/draw_lottery')
def draw_lottery():
award_id = b.request.params.get('award_id')
if award_id is None:
b.abort(403)
return lottery.draw_lottery(award_id)
@app.route('/static/<filepath:path>')
@ -55,6 +84,11 @@ def callback(filepath): @@ -55,6 +84,11 @@ def callback(filepath):
return b.static_file(filepath, os.path.join(CURRENT_DIR, "static"))
@b.error
def error403(error):
return "非法访问 " + error
def run(host, port, debug):
app.run(host=host, port=port, debug=debug)

20
src/random_selector.py

@ -0,0 +1,20 @@ @@ -0,0 +1,20 @@
#! /usr/bin/python3
# -*- coding:utf-8 -*-
import random
import os
class RandomSelector(object):
def __init__(self, elapsed):
if not 0 < elapsed <= 256*60:
elapsed = random.randrange(256*60)
self._selector = random.Random(int.from_bytes(os.urandom(32), 'big') * elapsed)
def randselect(self, items, num):
_items = items.copy()
self._selector.shuffle(_items)
return [_items[self._selector.randrange(len(_items))] for _ in range(num)]
def randomselect(elapsed_ms, items, select_num):
return RandomSelector(elapsed_ms).randselect(items, select_num)

127
src/test_lottery.py

@ -0,0 +1,127 @@ @@ -0,0 +1,127 @@
#! /usr/bin/python3
# -*- coding:utf-8 -*-
import unittest
from . import lottery as l
import time
class LotteryTestCase(unittest.TestCase):
def setUp(self):
print('开始测试', time.strftime('%Y-%m-%d(%a)%H:%M:%S'))
l.reset()
self._users = [
{
"uid": "G0001",
"name": "谷歌",
"award_id": None,
"role": 0
},
{
"uid": "G0002",
"name": "百度",
"award_id": None,
"role": 1
},
{
"uid": "G0003",
"name": "搜狗",
"award_id": None,
"role": 2
},
{
"uid": "G0004",
"name": "360",
"award_id": None,
"role": 3
},
{
"uid": "G0005",
"name": "亚马逊",
"award_id": None,
"role": 1
}
]
self._awards = [
{
"award_name": "三等奖(一)",
"award_size": 2,
"award_capacity": 2
},
{
"award_name": "三等奖(二)",
"award_size": 2,
"award_capacity": 2
},
{
"award_name": "二等奖",
"award_size": 2,
"award_capacity": 2
},
{
"award_name": "一等奖",
"award_size": 2,
"award_capacity": 2
},
{
"award_name": "特等奖",
"award_size": 2,
"award_capacity": 2
}
]
l.add_users(self._users)
l.add_awards(self._awards)
for id, award in enumerate(self._awards):
award['award_id'] = id
def tearDown(self):
print('结束测试', time.strftime('%Y-%m-%d(%a)%H:%M:%S'))
def test_users(self):
self.assertCountEqual(self._users, l.users()['data'])
for user in l.users()['data']:
self.assertTrue(user in self._users)
def test_draw_lottery(self):
l.run()
dlr = l.draw_lottery(0)
self.assertEqual(dlr['status'], 200)
for lucky in l.lucky_users(0)['data']:
self.assertTrue(lucky in dlr['data'])
print('中奖用户为:', lucky)
for unlucky in l.luckless_users()['data']:
for lucky in dlr['data']:
self.assertNotEqual(lucky['uid'], unlucky['uid'])
self.assertEqual(len(l.users()['data']), len(l.lucky_users(0)['data']) + len(l.luckless_users()['data']))
def test_awards(self):
l.add_awards([{'award_name': "BOSS奖励", 'award_capacity': 1}])
self.assertEqual(6, len(l.awards()['data']))
def test_award(self):
award = l.award(3)
self.assertEqual('一等奖', award['data']['award_name'])
def test_update(self):
l.update('G0004', 3)
ok = False
for u in l.lucky_users(3)['data']:
if u['uid'] == 'G0004':
ok = True
break
self.assertTrue(ok)
def test_revoke(self):
l.update('G0004', 3)
l.revoke('G0004')
ok = False
for u in l.lucky_users(3)['data']:
if u['uid'] == 'G0004':
ok = True
break
self.assertFalse(ok)
if __name__ == '__main__':
unittest.main()

6
src/users_db.py

@ -0,0 +1,6 @@ @@ -0,0 +1,6 @@
#! /usr/bin/python3
# -*- coding:utf-8 -*-
def load():
return []
Loading…
Cancel
Save