按这个测试了,结果有点疑问
# 具体遇到的问题
为什么user.json中不是20个,总会少几个。。。
# 报错信息的截图
# 相关课程内容截图
# 尝试过的解决思路和结果
# 粘贴全部相关代码,切记添加代码注释(请勿截图)
在这里输入代码,可通过选择【代码语言】突出显示
41
收起
正在回答 回答被采纳积分+1
2回答
好帮手慕小轩
2020-11-26 17:15:25
同学,你好!使用多进程执行,结果显示不全,可能是电脑cup的问题,同学可以使用多线程执行,可参考下面代码示例:
# coding:utf-8
import multiprocessing
import os
import threading
import time
import json
from common.utils import check_file, timestamp_to_string
from common.error import (UserExistsError, RoleError,
LevelError, NegativeNumberError,
CountError)
from common.consts import ROLES, FIRSTLEVELS, SECONDLEVELS
class Base(object):
def __init__(self, user_json, gift_json):
self.max_gift_counts = 3
# manger = multiprocessing.Manager()
# self.lock = manger.Lock()
self.lock = threading.Lock()
self.user_json = user_json
self.gift_json = gift_json
def __read_users(self, time_to_str=False):
with open(self.user_json, 'r') as f:
data = json.loads(f.read())
if time_to_str == True:
for username, v in data.items():
v['create_time'] = timestamp_to_string(v['create_time'])
v['update_time'] = timestamp_to_string(v['update_time'])
data[username] = v
return data
def write_user(self, **user): # password active
if 'username' not in user:
raise ValueError('missing username')
if 'role' not in user:
raise ValueError('missing role')
user['active'] = True
user['create_time'] = time.time()
user['update_time'] = time.time()
user['gifts'] = []
users = self.__read_users()
if user['username'] in users:
raise UserExistsError('username %s had exists' % user['username'])
users.update(
{user['username']: user}
)
self.__save(users, self.user_json)
def __save(self, data, path):
json_data = json.dumps(data)
self.lock.acquire()
try:
with open(path, 'w') as f:
f.write(json_data)
finally:
self.lock.release()
if __name__ == '__main__':
gift_path = os.path.join(os.getcwd(), 'storage', 'gift.json')
user_path = os.path.join(os.getcwd(), 'storage', 'user.json')
print(gift_path)
print(user_path)
# pool = multiprocessing.Pool(3)
# manger = multiprocessing.Manager()
# lock = manger.Lock()
base = Base(user_json=user_path, gift_json=gift_path)
for i in range(20):
t = threading.Thread(target=base.write_user, kwargs={'username': f'd{i}', 'role': 'admin'})
t.start()
t.join()
相似问题
登录后可查看更多问答,登录/注册
Python全栈工程师2020
- 参与学习 人
- 提交作业 5229 份
- 解答问题 2433 个
Facebook曾声称“只招全栈工程师”!全栈用人需求猛增,市面人才紧缺。 0基础进击Python全栈开发,诱人薪资在前方!
了解课程
恭喜解决一个难题,获得1积分~
来为老师/同学的回答评分吧
0 星