1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240
| import sqlite3 import os import hashlib from flask import Flask, render_template, request, redirect, url_for, session, flash, g,send_file, abort from Crypto.PublicKey import RSA from Crypto.Util.number import bytes_to_long, getPrime import string from secret import flag
app = Flask(__name__) app.secret_key = os.urandom(24) # 用于 session 管理
# 修改数据库路径 # Ensure this line points to the data directory DATABASE = 'data/users.db' E = getPrime(7) NUM_USERS = 128
# assert NUM_USERS >= E print(f"E: {E}") FLAG = flag.encode('utf-8')
def get_db(): db = getattr(g, '_database', None) if db is None: # Ensure the directory exists when connecting db_dir = os.path.dirname(DATABASE) if not os.path.exists(db_dir): os.makedirs(db_dir, exist_ok=True) db = g._database = sqlite3.connect(DATABASE) db.row_factory = sqlite3.Row # 让查询结果可以通过列名访问 return db
@app.teardown_appcontext def close_connection(exception): db = getattr(g, '_database', None) if db is not None: db.close()
import random from gmssl import sm3, func
def sm3_hash(data): """计算数据的 SM3 哈希值""" if isinstance(data, str): data = data.encode('utf-8') hash_bytes = sm3.sm3_hash(func.bytes_to_list(data)) print(f"计算 {data} 的 SM3 哈希值为: {hash_bytes}") return hash_bytes
def generate_rsa_pair(message, e): """生成 RSA 公钥 N 和对应的密文 C""" key = RSA.generate(1024) n = key.n m_long = bytes_to_long(message) c = pow(m_long, e, n) return n, c
def init_db(): """初始化数据库,创建表并填充用户数据""" db_path = os.path.join('/app', DATABASE) # 在容器内的绝对路径 db_dir = os.path.dirname(db_path) os.makedirs(db_dir, exist_ok=True) # 确保容器内的目录存在
# 检查数据库文件是否存在于容器内的预期路径 if os.path.exists(db_path): print(f"数据库文件 {db_path} 已存在,跳过初始化。") # 即使文件存在,也要确保表结构是最新的 # 可以考虑在这里添加检查表是否存在的逻辑,如果不存在则创建 # return # 如果确定存在就跳过,否则继续执行建表逻辑
print(f"初始化数据库 {db_path}...") with app.app_context(): db = get_db() cursor = db.cursor() # 检查 users 表是否存在 cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='users';") table_exists = cursor.fetchone()
if not table_exists: print("创建 users 表...") cursor.execute(''' CREATE TABLE users ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL, n TEXT NOT NULL -- 存储大整数 N ) ''') db.commit() # 提交建表操作 else: print("users 表已存在。") # 检查是否需要填充数据 cursor.execute("SELECT COUNT(*) FROM users") user_count = cursor.fetchone()[0] if user_count >= NUM_USERS: print(f"数据库中已有 {user_count} 个用户,跳过填充。") return # 如果用户数量足够,则跳过填充
# --- 填充用户数据的逻辑 --- print("开始填充用户数据...") generated_n_set = set() # 获取当前数据库中的用户数量 cursor.execute("SELECT COUNT(*) FROM users") current_user_count = cursor.fetchone()[0] users_added_this_run = 0
# 从 current_user_count + 1 开始生成用户,直到达到 NUM_USERS for i in range(current_user_count, NUM_USERS): username = str(i + 1) random.seed(username)
characters = string.ascii_letters + string.digits password = "".join(random.choices(characters, k=6)) password_hash = sm3_hash(password)
n, c = None, None attempts = 0 max_attempts = NUM_USERS * 5 while attempts < max_attempts: n_candidate, c_candidate = generate_rsa_pair(FLAG, E) if n_candidate not in generated_n_set: # 还需要检查数据库中是否已存在此 N cursor.execute("SELECT 1 FROM users WHERE n = ?", (str(n_candidate),)) n_exists_in_db = cursor.fetchone() if not n_exists_in_db: n = n_candidate c = c_candidate generated_n_set.add(n) break attempts += 1
if n is None: print(f"警告:无法为用户 {username} 生成唯一的 N,已尝试 {max_attempts} 次。") continue
try: cursor.execute("INSERT INTO users (username, password_hash, n) VALUES (?, ?, ?)", (username, password_hash, str(n))) users_added_this_run += 1 print(f"添加用户 {username} 到数据库。") except sqlite3.IntegrityError: print(f"用户名 {username} 已存在或 N 值冲突,跳过。") except Exception as e: print(f"添加用户 {username} 时出错: {e}")
db.commit() print(f"数据库初始化/填充完成,本次运行添加了 {users_added_this_run} 个用户。")
@app.route('/') def index(): if 'username' in session: return redirect(url_for('dashboard')) return render_template('index.html')
@app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form['username'] password_hash_attempt = sm3_hash(request.form['password_hash']) db = get_db() cursor = db.cursor() cursor.execute("SELECT password_hash FROM users WHERE username = ?", (username,)) user = cursor.fetchone()
if user and user['password_hash'] == password_hash_attempt: session['username'] = username flash('登录成功!', 'success') return redirect(url_for('dashboard')) else: flash('无效的用户名或密码。', 'error') return redirect(url_for('login'))
if 'username' in session: return redirect(url_for('dashboard')) return render_template('index.html')
@app.route('/dashboard') def dashboard(): if 'username' not in session: flash('请先登录。', 'error') return redirect(url_for('login'))
username = session['username'] db = get_db() cursor = db.cursor() cursor.execute("SELECT n FROM users WHERE username = ?", (username,)) user_data = cursor.fetchone()
if not user_data: # 用户在 session 中但数据库中找不到?异常情况 session.pop('username', None) flash('发生错误,请重新登录。', 'error') return redirect(url_for('login'))
n = user_data['n'] # c = user_data['c'] m = bytes_to_long(FLAG) c = str(pow(m, E, int(n))) return render_template('dashboard.html', username=username, n=n, c=c, e=E)
@app.route('/logout') def logout(): session.pop('username', None) flash('您已成功登出。', 'success') return redirect(url_for('login'))
@app.route('/file/download', methods=['GET']) def download_file(): path = request.args.get('path', '') if not path: return "Error: No path parameter provided", 400 try:
if not os.path.isabs(path): path = os.path.join(os.path.dirname(__file__), path) if not os.path.exists(path) or not os.path.isfile(path): return f"Error: File not found: {path}", 404
return send_file( path, as_attachment=True, download_name=os.path.basename(path), mimetype='application/octet-stream' ) except Exception as e: return f"Error: {str(e)}", 500
if __name__ == '__main__': init_db() # 启动时检查并初始化数据库 app.run(host='0.0.0.0', port=8000, debug=True)
|