"""
# ---------------------------------------------------------------------------
# 8. HELPER FUNCTIONS
# ---------------------------------------------------------------------------
def get_current_user():
uid = session.get("user_id")
if not uid:
return None
return DB_CONN.execute("SELECT * FROM users WHERE id=?", (uid,)).fetchone()
def get_friend_key(user_id):
row = DB_CONN.execute(
"SELECT emoji_code FROM keys WHERE user_id=? AND key_type='FRIEND'", (user_id,)
).fetchone()
return row["emoji_code"] if row else ""
def get_profile_key(user_id):
row = DB_CONN.execute(
"SELECT emoji_code FROM keys WHERE user_id=? AND key_type='PROFILE'", (user_id,)
).fetchone()
return row["emoji_code"] if row else ""
def are_friends(a_id, b_id):
a_key = get_friend_key(a_id)
b_key = get_friend_key(b_id)
code_ab = a_key + b_key
code_ba = b_key + a_key
ab = DB_CONN.execute(
"SELECT id FROM handshakes WHERE from_user_id=? AND to_user_id=? AND submitted_code=?",
(a_id, b_id, code_ab)
).fetchone()
ba = DB_CONN.execute(
"SELECT id FROM handshakes WHERE from_user_id=? AND to_user_id=? AND submitted_code=?",
(b_id, a_id, code_ba)
).fetchone()
return ab is not None and ba is not None
def get_friends(user_id):
all_users = DB_CONN.execute("SELECT id FROM users WHERE id != ?", (user_id,)).fetchall()
return [u["id"] for u in all_users if are_friends(user_id, u["id"])]
def restrict_user(user_id):
global DEFAULT_POST_KEY
DB_CONN.execute("UPDATE users SET post_tier='RESTRICTED' WHERE id=?", (user_id,))
DB_CONN.commit()
DEFAULT_POST_KEY = generate_emoji_key(2)
def check_auto_restrict(author_id):
author = DB_CONN.execute("SELECT * FROM users WHERE id=?", (author_id,)).fetchone()
if not author or author["post_tier"] == "RESTRICTED":
return
total_users = DB_CONN.execute("SELECT COUNT(*) FROM users").fetchone()[0]
if total_users <= 1:
return
unique_reporters = DB_CONN.execute(
"""SELECT COUNT(DISTINCT r.reporter_id) FROM reports r
JOIN posts p ON r.post_id = p.id
WHERE p.author_id = ? AND r.reporter_id != ?""",
(author_id, author_id)
).fetchone()[0]
threshold = (total_users - 1) * 0.51
if unique_reporters >= threshold:
restrict_user(author_id)
def order_feed(posts):
if not posts:
return []
if len(posts) == 1:
return list(posts)
posts = list(posts)
max_score = max(p["score"] for p in posts)
min_score = min(p["score"] for p in posts)
# Find top pinned (highest score, most recent tiebreak)
top_candidates = [p for p in posts if p["score"] == max_score]
top_pin = max(top_candidates, key=lambda p: p["created_at"])
# Find bottom pinned (lowest score, most recent tiebreak) โ must be different from top
remaining = [p for p in posts if p["id"] != top_pin["id"]]
if not remaining:
return [top_pin]
bot_min = min(p["score"] for p in remaining)
# Only pin bottom separately if it's actually different score from top or there are multiple posts
bot_candidates = [p for p in remaining if p["score"] == bot_min]
bot_pin = max(bot_candidates, key=lambda p: p["created_at"])
middle = [p for p in remaining if p["id"] != bot_pin["id"]]
middle.sort(key=lambda p: p["created_at"], reverse=True)
if len(posts) == 2:
return [top_pin, bot_pin]
return [top_pin] + middle + [bot_pin]
def require_admin(f):
@functools.wraps(f)
def decorated(*args, **kwargs):
if not session.get("admin_authed"):
return redirect(url_for("admin"))
return f(*args, **kwargs)
return decorated
def vote_weight(tier):
return {"ADMIN": 1, "TRUSTED": 2, "DEFAULT": 1, "RESTRICTED": 0}.get(tier, 0)
def tier_badge(tier):
cls = f"tier-{tier.lower()}"
return f'[{escape(tier)}]'
def has_reported(post_id, user_id):
return DB_CONN.execute(
"SELECT id FROM reports WHERE post_id=? AND reporter_id=?", (post_id, user_id)
).fetchone() is not None
def report_count(post_id):
return DB_CONN.execute(
"SELECT COUNT(*) FROM reports WHERE post_id=?", (post_id,)
).fetchone()[0]
def user_vote(post_id, user_id):
row = DB_CONN.execute(
"SELECT direction FROM votes WHERE post_id=? AND user_id=?", (post_id, user_id)
).fetchone()
return row["direction"] if row else None
# ---------------------------------------------------------------------------
# 9. ROUTES
# ---------------------------------------------------------------------------
@app.route("/join", methods=["GET", "POST"])
def join():
if session.get("user_id"):
return redirect(url_for("square"))
error = ""
if request.method == "POST":
username = request.form.get("username", "").strip()
profile_key = request.form.get("profile_key", "")
friend_key = request.form.get("friend_key", "")
if not username:
error = "Username required."
elif len(profile_key) < 2:
error = "Profile Key must be 2 emoji."
elif len(friend_key) < 2:
error = "Friend Key must be 2 emoji."
else:
existing = DB_CONN.execute("SELECT id FROM users WHERE username=?", (username,)).fetchone()
if existing:
error = "Username taken."
else:
cur = DB_CONN.execute(
"INSERT INTO users (session_label, username) VALUES (?, ?)",
("?", username)
)
DB_CONN.commit()
uid = cur.lastrowid
label = f"Anon #{uid}"
DB_CONN.execute("UPDATE users SET session_label=? WHERE id=?", (label, uid))
DB_CONN.execute(
"INSERT INTO keys (user_id, key_type, emoji_code) VALUES (?, 'PROFILE', ?)",
(uid, profile_key)
)
DB_CONN.execute(
"INSERT INTO keys (user_id, key_type, emoji_code) VALUES (?, 'FRIEND', ?)",
(uid, friend_key)
)
DB_CONN.commit()
session["user_id"] = uid
session["show_default_key"] = True
return redirect(url_for("square"))
error_html = f'
{escape(error)}
' if error else ""
content = f"""
{error_html}
JOIN THE SQUARE
Choose a secret username. Pick two emoji keys.
"""
return base_page("Join", content)
@app.route("/", methods=["GET", "POST"])
def square():
user = get_current_user()
if not user:
return redirect(url_for("join"))
error = ""
success = ""
# Show default key on first join
if session.pop("show_default_key", False):
success = f"Welcome, {escape(user['session_label'])}! Default Post Key: {DEFAULT_POST_KEY}"
if request.method == "POST":
content = request.form.get("content", "").strip()
post_key = request.form.get("post_key", "")
tier = user["post_tier"]
valid_key = False
if tier == "ADMIN":
valid_key = (post_key == ADMIN_POST_KEY)
elif tier == "TRUSTED":
valid_key = (post_key == TRUSTED_POST_KEY)
elif tier == "DEFAULT":
valid_key = (post_key == DEFAULT_POST_KEY)
elif tier == "RESTRICTED":
admin_key = request.form.get("admin_key", "")
valid_key = (admin_key == ADMIN_POST_KEY)
if not content:
error = "Post content required."
elif not valid_key:
error = "Post key incorrect."
else:
DB_CONN.execute(
"INSERT INTO posts (author_id, content) VALUES (?, ?)",
(user["id"], content)
)
DB_CONN.commit()
return redirect(url_for("square"))
friends_only = request.args.get("friends_only") == "1"
friend_ids = get_friends(user["id"])
if friends_only:
if not friend_ids:
raw_posts = []
else:
placeholders = ",".join("?" * len(friend_ids))
raw_posts = DB_CONN.execute(
f"SELECT * FROM posts WHERE author_id IN ({placeholders}) ORDER BY created_at DESC",
friend_ids
).fetchall()
else:
raw_posts = DB_CONN.execute(
"SELECT * FROM posts ORDER BY created_at DESC"
).fetchall()
ordered = order_feed(raw_posts)
posts_html = ""
for i, post in enumerate(ordered):
author = DB_CONN.execute("SELECT * FROM users WHERE id=?", (post["author_id"],)).fetchone()
is_friend = post["author_id"] in friend_ids
is_own = post["author_id"] == user["id"]
display_name = escape(author["username"]) if (is_friend or is_own) else escape(author["session_label"])
pin_class = ""
if i == 0 and len(ordered) > 1:
pin_class = " pinned-top"
elif i == len(ordered) - 1 and len(ordered) > 1:
pin_class = " pinned-bottom"
rcount = report_count(post["id"])
already_reported = has_reported(post["id"], user["id"])
my_vote = user_vote(post["id"], user["id"])
score_val = post["score"]
score_cls = "score neg" if score_val < 0 else "score"
content_escaped = escape(post["content"])
if rcount > 0:
content_html = f'[REPORTED โ click to reveal] {content_escaped}'
else:
content_html = str(content_escaped)
up_active = "active" if my_vote == 1 else ""
down_active = "active" if my_vote == -1 else ""
report_btn = ""
if not is_own:
if already_reported:
report_btn = 'reported'
else:
report_btn = f''''''
posts_html += f"""
'
# All users for handshake dropdown (excluding self)
all_users = DB_CONN.execute(
"SELECT id, username, session_label FROM users WHERE id != ? ORDER BY session_label",
(user["id"],)
).fetchall()
user_options = "".join(
f''
for u in all_users
)
vis_label = "PUBLIC" if user["notes_visible"] else "PRIVATE"
message_html = f'
{escape(message)}
' if message else ""
error_html = f'
{escape(error)}
' if error else ""
content = f"""
{message_html}{error_html}
YOUR IDENTITY
Session Label
{escape(user['session_label'])}
Username
{escape(user['username'])}
Tier
{tier_badge(user['post_tier'])}
Friend Key
{escape(friend_key_val)}
Profile Key
{escape(profile_key_val)}
YOUR NOTES [{vis_label}]
CONFIRMED FRIENDS
{friends_html}
HANDSHAKE
Exchange usernames and Friend Keys in person, then submit here.
"""
return base_page("Profile", content, current_user=user)
@app.route("/handshake", methods=["POST"])
def handshake():
user = get_current_user()
if not user:
return redirect(url_for("join"))
target_username = request.form.get("target_username", "")
my_friend_key = request.form.get("my_friend_key", "")
their_friend_key = request.form.get("their_friend_key", "")
target = DB_CONN.execute("SELECT * FROM users WHERE username=?", (target_username,)).fetchone()
valid = (
target is not None
and target["id"] != user["id"]
and my_friend_key == get_friend_key(user["id"])
and their_friend_key == get_friend_key(target["id"])
)
if not valid:
session["handshake_msg"] = ("error", "One or both keys are incorrect.")
return redirect(url_for("profile"))
submitted_code = my_friend_key + their_friend_key
try:
DB_CONN.execute(
"INSERT INTO handshakes (from_user_id, to_user_id, submitted_code) VALUES (?, ?, ?)",
(user["id"], target["id"], submitted_code)
)
DB_CONN.commit()
except sqlite3.IntegrityError:
pass
if are_friends(user["id"], target["id"]):
session["handshake_msg"] = ("success", f"Friendship confirmed with {target['username']}!")
else:
session["handshake_msg"] = ("info", f"Your side submitted. Waiting for {escape(target['session_label'])} to complete the handshake.")
return redirect(url_for("profile"))
@app.route("/admin", methods=["GET", "POST"])
def admin():
error = ""
if request.method == "POST" and not session.get("admin_authed"):
submitted = request.form.get("admin_key", "")
if submitted == ADMIN_POST_KEY:
session["admin_authed"] = True
else:
error = "Incorrect Admin Key."
if not session.get("admin_authed"):
error_html = f'
{escape(error)}
' if error else ""
content = f"""
{error_html}
ADMIN LOGIN
"""
return base_page("Admin Login", content)
# Full admin panel
users = DB_CONN.execute("SELECT * FROM users ORDER BY id").fetchall()
posts = DB_CONN.execute("SELECT * FROM posts ORDER BY id DESC").fetchall()
votes = DB_CONN.execute("SELECT * FROM votes ORDER BY id DESC LIMIT 50").fetchall()
reports_all = DB_CONN.execute("SELECT * FROM reports ORDER BY id DESC LIMIT 50").fetchall()
keys_all = DB_CONN.execute("SELECT * FROM keys ORDER BY id").fetchall()
users_html = ""
for u in users:
tier = u["post_tier"]
users_html += f"""
"""
# โโ Platform View โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
doxx_rows = DB_CONN.execute("""
SELECT u.id, u.session_label, u.username, u.post_tier,
COUNT(DISTINCT p.id) AS post_count,
COALESCE(SUM(p.score), 0) AS net_score
FROM users u
LEFT JOIN posts p ON p.author_id = u.id
GROUP BY u.id
ORDER BY u.id
""").fetchall()
hs_rows = DB_CONN.execute("""
SELECT h.from_user_id, h.to_user_id,
u1.session_label AS from_label,
u2.session_label AS to_label,
u1.username AS from_name,
u2.username AS to_name
FROM handshakes h
JOIN users u1 ON h.from_user_id = u1.id
JOIN users u2 ON h.to_user_id = u2.id
ORDER BY h.id
""").fetchall()
# Build directed-pair set once; check both directions for CONFIRMED
hs_pairs = {(r["from_user_id"], r["to_user_id"]) for r in hs_rows}
seen_pairs = set()
graph_rows = []
for r in hs_rows:
a, b = r["from_user_id"], r["to_user_id"]
key = (min(a, b), max(a, b))
if key in seen_pairs:
continue
seen_pairs.add(key)
confirmed = (a, b) in hs_pairs and (b, a) in hs_pairs
graph_rows.append((r, confirmed))
def is_friends_pair(id_a, id_b):
return (id_a, id_b) in hs_pairs and (id_b, id_a) in hs_pairs
vote_corr = DB_CONN.execute("""
SELECT v.user_id AS voter_id,
uv.session_label AS voter_label,
uv.username AS voter_name,
p.author_id,
ua.session_label AS author_label,
ua.username AS author_name,
SUM(v.direction * v.weight) AS net
FROM votes v
JOIN posts p ON v.post_id = p.id
JOIN users uv ON v.user_id = uv.id
JOIN users ua ON p.author_id = ua.id
WHERE v.user_id != p.author_id
GROUP BY v.user_id, p.author_id
ORDER BY ABS(SUM(v.direction * v.weight)) DESC
""").fetchall()
report_corr = DB_CONN.execute("""
SELECT r.reporter_id,
ur.session_label AS reporter_label,
ur.username AS reporter_name,
p.author_id,
ua.session_label AS author_label,
ua.username AS author_name,
COUNT(*) AS count
FROM reports r
JOIN posts p ON r.post_id = p.id
JOIN users ur ON r.reporter_id = ur.id
JOIN users ua ON p.author_id = ua.id
WHERE r.reporter_id != p.author_id
GROUP BY r.reporter_id, p.author_id
ORDER BY COUNT(*) DESC
""").fetchall()
# Build HTML for Panel 1
doxx_html = ""
for r in doxx_rows:
net = r["net_score"]
net_cls = "neg" if net < 0 else ""
doxx_html += f"""
{escape(r['session_label'])}
{escape(r['username'])}
{tier_badge(r['post_tier'])}
{r['post_count']}
{net:+d}
"""
# Build HTML for Panel 2
graph_html = ""
for r, confirmed in graph_rows:
status_color = "#44ccff" if confirmed else "#886699"
status_text = "CONFIRMED" if confirmed else "PENDING"
graph_html += f"""
"""
# Build HTML for Panel 3 โ votes
vote_corr_html = ""
for r in vote_corr:
net = r["net"]
net_cls = "neg" if net < 0 else ""
friends_badge = ' [FRIENDS]' \
if is_friends_pair(r["voter_id"], r["author_id"]) else ""
vote_corr_html += f"""