mirror of
https://github.com/chenasraf/nextcloud-deck-tools.git
synced 2026-05-17 17:28:07 +00:00
364 lines
10 KiB
Python
Executable File
364 lines
10 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
import argparse
|
|
import csv
|
|
import glob
|
|
import importlib
|
|
import os
|
|
import sys
|
|
|
|
import inquirer
|
|
import requests
|
|
|
|
from common import (
|
|
add_common_args,
|
|
build_session,
|
|
resolve_arg,
|
|
resolve_common,
|
|
resolve_domain_and_auth,
|
|
)
|
|
|
|
# Import functions from existing scripts
|
|
from create_board import create_board, create_stacks_in_order, get_stack_names
|
|
from list_boards import list_boards
|
|
from list_stacks import list_stacks
|
|
|
|
# Import from 'import.py' using importlib since 'import' is a keyword
|
|
import_module = importlib.import_module("import")
|
|
create_card = import_module.create_card
|
|
|
|
|
|
def interactive_board_selection(
|
|
session: requests.Session, base_url: str
|
|
) -> tuple[int, str]:
|
|
"""
|
|
Show interactive menu to create new board or select existing board.
|
|
Returns (board_id, board_name)
|
|
"""
|
|
# Fetch existing boards
|
|
try:
|
|
boards = list_boards(session, base_url)
|
|
except Exception as e:
|
|
raise RuntimeError(f"Failed to fetch boards: {e}")
|
|
|
|
# Create choices
|
|
choices = ["Create new board"]
|
|
board_map = {}
|
|
|
|
for board in boards:
|
|
board_id = board.get("id")
|
|
board_title = board.get("title", "Untitled")
|
|
choice_text = f"{board_title} (ID: {board_id})"
|
|
choices.append(choice_text)
|
|
board_map[choice_text] = (board_id, board_title)
|
|
|
|
questions = [
|
|
inquirer.List(
|
|
"board",
|
|
message="Which board do you want to work with?",
|
|
choices=choices,
|
|
)
|
|
]
|
|
|
|
answers = inquirer.prompt(questions)
|
|
if not answers:
|
|
raise RuntimeError("Board selection cancelled")
|
|
|
|
selected = answers["board"]
|
|
|
|
if selected == "Create new board":
|
|
# Interactive board creation
|
|
board_name_q = [
|
|
inquirer.Text(
|
|
"board_name",
|
|
message="Enter new board name",
|
|
)
|
|
]
|
|
board_name_answer = inquirer.prompt(board_name_q)
|
|
if not board_name_answer:
|
|
raise RuntimeError("Board creation cancelled")
|
|
|
|
board_name = board_name_answer["board_name"]
|
|
if not board_name:
|
|
raise RuntimeError("Board name cannot be empty")
|
|
|
|
# Create the board
|
|
print(f"\n→ Creating board '{board_name}'...")
|
|
board = create_board(session, base_url, board_name)
|
|
board_id = board.get("id")
|
|
if board_id is None:
|
|
raise RuntimeError(f"Board created but no id returned: {board}")
|
|
print(f"✔️ Created board '{board_name}' (id: {board_id})")
|
|
|
|
# Create stacks
|
|
stack_names = get_stack_names()
|
|
print("→ Creating stacks in order:")
|
|
create_stacks_in_order(session, base_url, int(board_id), stack_names)
|
|
|
|
return (int(board_id), board_name)
|
|
else:
|
|
return board_map[selected]
|
|
|
|
|
|
def interactive_stack_selection(
|
|
session: requests.Session, base_url: str, board_id: int
|
|
) -> tuple[int, str]:
|
|
"""
|
|
Show interactive menu to select a stack/column.
|
|
Returns (stack_id, stack_name)
|
|
"""
|
|
# Fetch stacks
|
|
try:
|
|
stacks = list_stacks(session, base_url, board_id)
|
|
except Exception as e:
|
|
raise RuntimeError(f"Failed to fetch stacks: {e}")
|
|
|
|
if not stacks:
|
|
raise RuntimeError(f"No stacks found for board {board_id}")
|
|
|
|
# Create choices
|
|
choices = []
|
|
stack_map = {}
|
|
|
|
for stack in stacks:
|
|
stack_id = stack.get("id")
|
|
stack_title = stack.get("title", "Untitled")
|
|
choice_text = f"{stack_title} (ID: {stack_id})"
|
|
choices.append(choice_text)
|
|
stack_map[choice_text] = (stack_id, stack_title)
|
|
|
|
questions = [
|
|
inquirer.List(
|
|
"stack",
|
|
message="Which column/stack do you want to import to?",
|
|
choices=choices,
|
|
)
|
|
]
|
|
|
|
answers = inquirer.prompt(questions)
|
|
if not answers:
|
|
raise RuntimeError("Stack selection cancelled")
|
|
|
|
selected = answers["stack"]
|
|
return stack_map[selected]
|
|
|
|
|
|
def interactive_csv_selection() -> str:
|
|
"""
|
|
Show interactive menu to select CSV file from current directory,
|
|
or allow user to enter custom path.
|
|
Returns CSV file path.
|
|
"""
|
|
# Find all .csv files in current directory
|
|
csv_files = sorted(glob.glob("*.csv"))
|
|
|
|
choices = []
|
|
if csv_files:
|
|
choices.extend(csv_files)
|
|
choices.append("Enter custom file path")
|
|
|
|
questions = [
|
|
inquirer.List(
|
|
"csv_file",
|
|
message="Which CSV file do you want to import?",
|
|
choices=choices,
|
|
)
|
|
]
|
|
|
|
answers = inquirer.prompt(questions)
|
|
if not answers:
|
|
raise RuntimeError("CSV file selection cancelled")
|
|
|
|
selected = answers["csv_file"]
|
|
|
|
if selected == "Enter custom file path":
|
|
# Ask for custom path
|
|
path_q = [
|
|
inquirer.Text(
|
|
"csv_path",
|
|
message="Enter CSV file path",
|
|
)
|
|
]
|
|
path_answer = inquirer.prompt(path_q)
|
|
if not path_answer:
|
|
raise RuntimeError("CSV path entry cancelled")
|
|
|
|
csv_path = path_answer["csv_path"]
|
|
if not csv_path:
|
|
raise RuntimeError("CSV path cannot be empty")
|
|
|
|
return csv_path
|
|
else:
|
|
return selected
|
|
|
|
|
|
def import_csv_file(
|
|
session: requests.Session,
|
|
base_url: str,
|
|
board_id: int,
|
|
stack_id: int,
|
|
csv_file: str,
|
|
) -> None:
|
|
"""
|
|
Import CSV file into the specified stack.
|
|
CSV format: title,description
|
|
"""
|
|
if not os.path.isfile(csv_file):
|
|
raise RuntimeError(f"CSV file not found: {csv_file}")
|
|
|
|
print(f"\n→ Importing cards from '{csv_file}'...")
|
|
|
|
imported = 0
|
|
skipped = 0
|
|
|
|
with open(csv_file, "r", encoding="utf-8", newline="") as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
title = row.get("title", "").strip()
|
|
description = row.get("description", "").strip()
|
|
|
|
if not title:
|
|
skipped += 1
|
|
continue
|
|
|
|
try:
|
|
resp = create_card(
|
|
session, base_url, int(board_id), int(stack_id), title, description
|
|
)
|
|
if resp.status_code in (200, 201):
|
|
print(f" ✔️ Created card: {title}")
|
|
imported += 1
|
|
else:
|
|
print(
|
|
f" ❌ Failed to create card '{title}': {resp.status_code} {resp.text}"
|
|
)
|
|
skipped += 1
|
|
except Exception as e:
|
|
print(f" ❌ Failed to create card '{title}': {e}")
|
|
skipped += 1
|
|
|
|
print(f"\n✔️ Import complete! {imported} cards imported, {skipped} skipped.")
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(
|
|
description="Unified interactive tool for importing CSV files into Nextcloud Deck."
|
|
)
|
|
|
|
# Add common args for non-interactive mode
|
|
add_common_args(parser)
|
|
|
|
# Add CSV-specific args
|
|
parser.add_argument(
|
|
"--csv-file",
|
|
help="Path to CSV file to import (for non-interactive mode)",
|
|
)
|
|
parser.add_argument(
|
|
"--stack-id",
|
|
help="Stack/column ID to import to (for non-interactive mode)",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Check if we're in non-interactive mode (all required args provided)
|
|
non_interactive = all(
|
|
[
|
|
args.domain
|
|
or os.getenv("NEXTCLOUD_DOMAIN")
|
|
or os.getenv("NEXTCLOUD_BASE_URL"),
|
|
args.board_id or os.getenv("NEXTCLOUD_BOARD_ID") or os.getenv("BOARD_ID"),
|
|
args.stack_id or os.getenv("NEXTCLOUD_STACK_ID") or os.getenv("STACK_ID"),
|
|
args.csv_file or os.getenv("IMPORT_CSV_FILE") or os.getenv("CSV_FILE"),
|
|
]
|
|
)
|
|
|
|
if non_interactive:
|
|
# Non-interactive mode - use existing resolution logic
|
|
try:
|
|
base_url, board_id, username, password = resolve_common(
|
|
cli_domain=args.domain,
|
|
cli_board_id=args.board_id,
|
|
cli_username=args.username,
|
|
cli_password=args.password,
|
|
)
|
|
|
|
stack_id = resolve_arg(
|
|
args.stack_id,
|
|
["NEXTCLOUD_STACK_ID", "STACK_ID"],
|
|
prompt_text="Stack ID: ",
|
|
cast=int,
|
|
)
|
|
|
|
csv_file = resolve_arg(
|
|
args.csv_file,
|
|
["IMPORT_CSV_FILE", "CSV_FILE"],
|
|
prompt_text="CSV file path: ",
|
|
cast=str,
|
|
)
|
|
except Exception as e:
|
|
print(f"❌ {e}", file=sys.stderr)
|
|
sys.exit(2)
|
|
|
|
session = build_session(username, password)
|
|
|
|
try:
|
|
import_csv_file(session, base_url, int(board_id), int(stack_id), csv_file)
|
|
except Exception as e:
|
|
print(f"❌ Import failed: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
else:
|
|
# Interactive mode
|
|
print("Welcome to Nextcloud Deck CSV Importer!\n")
|
|
|
|
# Resolve domain and auth (can still use ENV or will prompt)
|
|
try:
|
|
base_url, username, password = resolve_domain_and_auth(
|
|
cli_domain=args.domain,
|
|
cli_username=args.username,
|
|
cli_password=args.password,
|
|
)
|
|
except Exception as e:
|
|
print(f"❌ {e}", file=sys.stderr)
|
|
sys.exit(2)
|
|
|
|
session = build_session(username, password)
|
|
|
|
try:
|
|
# Step 1: Board selection/creation
|
|
board_id, board_name = interactive_board_selection(session, base_url)
|
|
print(f"\n→ Working with board: {board_name} (ID: {board_id})\n")
|
|
|
|
# Step 2: Stack selection
|
|
stack_id, stack_name = interactive_stack_selection(
|
|
session, base_url, board_id
|
|
)
|
|
print(f"→ Selected column: {stack_name} (ID: {stack_id})\n")
|
|
|
|
# Step 3: CSV file selection
|
|
csv_file = interactive_csv_selection()
|
|
|
|
# Step 4: Confirm and import
|
|
confirm_q = [
|
|
inquirer.Confirm(
|
|
"confirm",
|
|
message=f"Import '{csv_file}' to '{stack_name}' in board '{board_name}'?",
|
|
default=True,
|
|
)
|
|
]
|
|
confirm_answer = inquirer.prompt(confirm_q)
|
|
if not confirm_answer or not confirm_answer["confirm"]:
|
|
print("Import cancelled.")
|
|
sys.exit(0)
|
|
|
|
# Import!
|
|
import_csv_file(session, base_url, board_id, stack_id, csv_file)
|
|
|
|
except Exception as e:
|
|
print(f"❌ {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|