import os import sys # --- Frozen binary dispatch (must be before all other imports) --- # When the frozen binary spawns itself as a subprocess, it passes one of these # markers as argv[1] so the child process runs in the correct mode and exits. if len(sys.argv) >= 2: if sys.argv[1] == '__cli__': # Pipeline CLI mode: strip marker, hand off to playlist.main() del sys.argv[1] from playlist import main as _cli_main _cli_main() sys.exit(0) elif sys.argv[1] == '__picker__': # Folder picker mode: open native dialog, print path, exit import tkinter as tk from tkinter import filedialog _root = tk.Tk() _root.withdraw() _root.attributes('-topmost', True) print(filedialog.askdirectory() or '', end='') sys.exit(0) import json import time import subprocess import threading import socket import re from pathlib import Path from flask import Flask, render_template_string, request, Response, jsonify # --- Configuration & Palette --- APP_NAME = "PLAYLIST PIRATE" VERSION = "v2.0" # Fire Orange Palette PALETTE = { "bg_void": "#04060b", "text_warm": "#e8d5b8", "text_muted": "#7a6f5e", "fp": "#ff6600", # fire orange "fb": "#ff8833", # fire bright "fd": "#cc4400", # fire deep "fg": "rgba(255,102,0,0.12)", # fire glow } # --- Flask App Setup --- app = Flask(__name__) current_proc = None current_step = None output_queue = [] def strip_ansi(text): ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') return ansi_escape.sub('', text) PLAYLIST_PY = str(Path(__file__).parent / "playlist.py") def get_free_port(): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(('', 0)) port = s.getsockname()[1] s.close() return port # --- File/Folder Picker --- def pick_folder_native(): """Open a native folder picker via tkinter subprocess.""" if getattr(sys, 'frozen', False): # Frozen: re-invoke this binary in __picker__ mode cmd = [sys.executable, '__picker__'] else: # From source: re-invoke this script in __picker__ mode cmd = [sys.executable, str(Path(__file__).resolve()), '__picker__'] try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=60) path = result.stdout.strip() return path if path else None except Exception as e: print(f"Picker error: {e}") return None # --- Routes --- @app.route('/') def index(): return render_template_string(HTML_TEMPLATE, palette=PALETTE, version=VERSION) @app.route('/pick-folder', methods=['POST']) def pick_folder(): path = pick_folder_native() if path: return jsonify({"path": path}) return jsonify({"path": None}), 400 @app.route('/scan-dir', methods=['POST']) def scan_dir(): data = request.json path = data.get("path") if not path or not os.path.exists(path): return jsonify({"error": "Invalid path"}), 400 p = Path(path) csvs = [f.name for f in p.glob("*.csv")] mds = [f.name for f in p.glob("*-playlist.md")] return jsonify({ "csvs": csvs, "mds": mds }) _TRACK_RE_NEW = re.compile(r'^- \[( |x|-)\] (.+?) \| (.+?) \| ISRC:[^ ]+ \| SP:[^ ]+ \| (.+)$') _TRACK_RE_OLD = re.compile(r'^- \[( |x|-)\] (.+?) \| (.+?) \| ISRC:[^ ]+ \| (.+)$') _pending_merges = [] # list of (temp_path, original_path) @app.route('/read-tracks', methods=['POST']) def read_tracks(): data = request.json md_path = os.path.join(data.get('work_dir', ''), data.get('filename', '')) if not os.path.exists(md_path): return jsonify({'error': 'File not found'}), 404 tracks = [] with open(md_path, encoding='utf-8') as f: for line in f: m = _TRACK_RE_NEW.match(line.strip()) or _TRACK_RE_OLD.match(line.strip()) if m: status, url = m.group(1), m.group(4) tracks.append({ 'title': m.group(2), 'artist': m.group(3), 'downloadable': status == ' ' and url not in ('?', 'NOT_FOUND', '-'), }) return jsonify({'tracks': tracks}) def _create_combined_download_md(work_dir, track_filter): """Build one combined temp .md from all selected tracks across playlists. Deduplicates by YouTube URL. Returns (temp_path, list_of_original_paths).""" seen_urls = set() lines_out = ['# Combined Download\n', '\n', '\n'] originals = [] for md_filename, selected_titles in track_filter.items(): original = os.path.join(work_dir, md_filename) if not os.path.exists(original): continue originals.append(original) selected = set(selected_titles) with open(original, encoding='utf-8') as f: for line in f: m = _TRACK_RE_NEW.match(line.strip()) or _TRACK_RE_OLD.match(line.strip()) if not m: continue if m.group(1) != ' ': continue # already done / not found title, url = m.group(2), m.group(4) if title not in selected: continue if url not in ('?', 'NOT_FOUND', '-') and url in seen_urls: continue # duplicate URL — skip if url not in ('?', 'NOT_FOUND', '-'): seen_urls.add(url) lines_out.append(line if line.endswith('\n') else line + '\n') if len(lines_out) <= 3: return None, [] tmp = os.path.join(work_dir, '_download_queue.tmp.md') with open(tmp, 'w', encoding='utf-8') as f: f.writelines(lines_out) return tmp, originals def _merge_and_cleanup(): """Copy DONE status from temp back to original .md files, then delete temp.""" for item in _pending_merges: temp_path, originals = item try: done = set() with open(temp_path, encoding='utf-8') as f: for line in f: m = _TRACK_RE_NEW.match(line.strip()) or _TRACK_RE_OLD.match(line.strip()) if m and m.group(1) == 'x': done.add(m.group(2)) for original_path in (originals if isinstance(originals, list) else [originals]): if not done or not os.path.exists(original_path): continue updated = [] with open(original_path, encoding='utf-8') as f: for line in f: m = _TRACK_RE_NEW.match(line.strip()) or _TRACK_RE_OLD.match(line.strip()) if m and m.group(1) == ' ' and m.group(2) in done: line = line.replace('- [ ]', '- [x]', 1) updated.append(line) with open(original_path, 'w', encoding='utf-8') as f: f.writelines(updated) except Exception as e: print(f'Merge error: {e}') finally: try: os.remove(temp_path) except: pass _pending_merges.clear() @app.route('/run', methods=['POST']) def run_command(): global current_proc, current_step, output_queue if current_proc and current_proc.poll() is None: return jsonify({"error": "A process is already running"}), 400 data = request.json step = data.get("step") args = data.get("args", []) work_dir = data.get("work_dir", os.getcwd()) track_filter = data.get("track_filter") # {md_filename: [titles]} or None # For download with per-track selection, build filtered temp files if step == 'download' and track_filter: # Build one combined temp file — deduped by URL, one track at a time tmp_path, originals = _create_combined_download_md(work_dir, track_filter) if not tmp_path: return jsonify({"error": "No downloadable tracks selected"}), 400 _pending_merges.append((tmp_path, originals)) # Find --output and its value from args; keep everything after it try: out_idx = args.index('--output') extra = args[out_idx:] except ValueError: extra = [] args = [os.path.basename(tmp_path)] + extra if getattr(sys, 'frozen', False): cmd = [sys.executable, '__cli__', step] + args else: cmd = [sys.executable, PLAYLIST_PY, step] + args try: current_proc = subprocess.Popen( cmd, cwd=work_dir, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, env=os.environ.copy() ) current_step = step output_queue = [] return jsonify({"ok": True}) except Exception as e: return jsonify({"error": str(e)}), 500 @app.route('/stream') def stream(): def generate(): global current_proc, current_step if not current_proc: yield "data: __DONE__\n\n" return while True: line = current_proc.stdout.readline() if not line: if current_proc.poll() is not None: break time.sleep(0.1) continue clean = strip_ansi(line).rstrip() if clean: yield f"data: {json.dumps(clean)}\n\n" completed_step = current_step current_proc = None current_step = None if completed_step == 'download': _merge_and_cleanup() else: # Discard any stale pending merges from a dropped connection for item in _pending_merges: try: os.remove(item[0]) except: pass _pending_merges.clear() yield "data: __DONE__\n\n" return Response(generate(), mimetype='text/event-stream') # --- Embedded Assets --- HTML_TEMPLATE = """ PLAYLIST PIRATE ☠

PLAYLIST PIRATE

{{ version }}  /  CHART YER COURSE BELOW
⚓ AT ANCHOR
No port charted
I
PARSE THE MANIFEST

Reads your Spotify CSV exports and creates a tracking file for each playlist. Select the CSVs you want to process — you can always come back and run others later.

Chart a course to detect yer CSVs...
III
RAISE THE FLAG

Generates a static HTML page for each playlist with embedded YouTube players, MusicBrainz recording links, artist pages, and Spotify links. Choose where to put them — they're ready to drop straight into a website.

IV
PLUNDER THE HOLD

Downloads each track as a 192kbps MP3 with title, artist, album and ISRC tags embedded. Requires ffmpeg. This step is always opt-in — nothing downloads unless you run it.

⚠ PIRATE'S OATH
You are responsible for ensuring you have the right to download this content in your jurisdiction.
📜 Playlists
🎵 Tracks
Select a playlist to browse its tracks
🐦 CROW'S NEST
""" # --- Entry Point --- if __name__ == '__main__': import webbrowser port = get_free_port() url = f"http://localhost:{port}" def run_flask(): import logging logging.getLogger('werkzeug').setLevel(logging.ERROR) app.run(port=port, debug=False, use_reloader=False) threading.Thread(target=run_flask, daemon=True).start() time.sleep(0.5) webbrowser.open(url) try: while True: time.sleep(1) except KeyboardInterrupt: pass