mirror of
https://gitee.com/wanwujie/deer-flow
synced 2026-04-03 06:12:14 +08:00
The data analysis skill executes shell commands using os Resolves V-001 Co-authored-by: orbisai0security <orbisai0security@users.noreply.github.com>
567 lines
19 KiB
Python
567 lines
19 KiB
Python
"""
|
|
Data Analysis Script using DuckDB.
|
|
|
|
Analyzes Excel (.xlsx/.xls) and CSV files using DuckDB's in-process SQL engine.
|
|
Supports schema inspection, SQL queries, statistical summaries, and result export.
|
|
"""
|
|
|
|
import argparse
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(message)s")
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
import duckdb
|
|
except ImportError:
|
|
logger.error("duckdb is not installed. Installing...")
|
|
subprocess.run([sys.executable, "-m", "pip", "install", "duckdb", "openpyxl", "-q"], check=True)
|
|
import duckdb
|
|
|
|
try:
|
|
import openpyxl # noqa: F401
|
|
except ImportError:
|
|
subprocess.run([sys.executable, "-m", "pip", "install", "openpyxl", "-q"], check=True)
|
|
|
|
# Cache directory for persistent DuckDB databases
|
|
CACHE_DIR = os.path.join(tempfile.gettempdir(), ".data-analysis-cache")
|
|
TABLE_MAP_SUFFIX = ".table_map.json"
|
|
|
|
|
|
def compute_files_hash(files: list[str]) -> str:
|
|
"""Compute a combined SHA256 hash of all input files for cache key."""
|
|
hasher = hashlib.sha256()
|
|
for file_path in sorted(files):
|
|
try:
|
|
with open(file_path, "rb") as f:
|
|
while chunk := f.read(8192):
|
|
hasher.update(chunk)
|
|
except OSError:
|
|
# Include path as fallback if file can't be read
|
|
hasher.update(file_path.encode())
|
|
return hasher.hexdigest()
|
|
|
|
|
|
def get_cache_db_path(files_hash: str) -> str:
|
|
"""Get the path to the cached DuckDB database file."""
|
|
os.makedirs(CACHE_DIR, exist_ok=True)
|
|
return os.path.join(CACHE_DIR, f"{files_hash}.duckdb")
|
|
|
|
|
|
def get_table_map_path(files_hash: str) -> str:
|
|
"""Get the path to the cached table map JSON file."""
|
|
return os.path.join(CACHE_DIR, f"{files_hash}{TABLE_MAP_SUFFIX}")
|
|
|
|
|
|
def save_table_map(files_hash: str, table_map: dict[str, str]) -> None:
|
|
"""Save table map to a JSON file alongside the cached DB."""
|
|
path = get_table_map_path(files_hash)
|
|
with open(path, "w", encoding="utf-8") as f:
|
|
json.dump(table_map, f, ensure_ascii=False)
|
|
|
|
|
|
def load_table_map(files_hash: str) -> dict[str, str] | None:
|
|
"""Load table map from cache. Returns None if not found."""
|
|
path = get_table_map_path(files_hash)
|
|
if not os.path.exists(path):
|
|
return None
|
|
try:
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def sanitize_table_name(name: str) -> str:
|
|
"""Sanitize a sheet/file name into a valid SQL table name."""
|
|
sanitized = re.sub(r"[^\w]", "_", name)
|
|
if sanitized and sanitized[0].isdigit():
|
|
sanitized = f"t_{sanitized}"
|
|
return sanitized
|
|
|
|
|
|
def load_files(con: duckdb.DuckDBPyConnection, files: list[str]) -> dict[str, str]:
|
|
"""
|
|
Load Excel/CSV files into DuckDB tables.
|
|
|
|
Returns a mapping of original_name -> sanitized_table_name.
|
|
"""
|
|
con.execute("INSTALL spatial; LOAD spatial;")
|
|
table_map: dict[str, str] = {}
|
|
|
|
for file_path in files:
|
|
if not os.path.exists(file_path):
|
|
logger.error(f"File not found: {file_path}")
|
|
continue
|
|
|
|
ext = os.path.splitext(file_path)[1].lower()
|
|
|
|
if ext in (".xlsx", ".xls"):
|
|
_load_excel(con, file_path, table_map)
|
|
elif ext == ".csv":
|
|
_load_csv(con, file_path, table_map)
|
|
else:
|
|
logger.warning(f"Unsupported file format: {ext} ({file_path})")
|
|
|
|
return table_map
|
|
|
|
|
|
def _load_excel(
|
|
con: duckdb.DuckDBPyConnection, file_path: str, table_map: dict[str, str]
|
|
) -> None:
|
|
"""Load all sheets from an Excel file into DuckDB tables."""
|
|
import openpyxl
|
|
|
|
wb = openpyxl.load_workbook(file_path, read_only=True, data_only=True)
|
|
sheet_names = wb.sheetnames
|
|
wb.close()
|
|
|
|
for sheet_name in sheet_names:
|
|
table_name = sanitize_table_name(sheet_name)
|
|
|
|
# Handle duplicate table names
|
|
original_table_name = table_name
|
|
counter = 1
|
|
while table_name in table_map.values():
|
|
table_name = f"{original_table_name}_{counter}"
|
|
counter += 1
|
|
|
|
try:
|
|
con.execute(
|
|
f"""
|
|
CREATE TABLE "{table_name}" AS
|
|
SELECT * FROM st_read(
|
|
'{file_path}',
|
|
layer = '{sheet_name}',
|
|
open_options = ['HEADERS=FORCE', 'FIELD_TYPES=AUTO']
|
|
)
|
|
"""
|
|
)
|
|
table_map[sheet_name] = table_name
|
|
row_count = con.execute(f'SELECT COUNT(*) FROM "{table_name}"').fetchone()[
|
|
0
|
|
]
|
|
logger.info(
|
|
f" Loaded sheet '{sheet_name}' -> table '{table_name}' ({row_count} rows)"
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f" Failed to load sheet '{sheet_name}': {e}")
|
|
|
|
|
|
def _load_csv(
|
|
con: duckdb.DuckDBPyConnection, file_path: str, table_map: dict[str, str]
|
|
) -> None:
|
|
"""Load a CSV file into a DuckDB table."""
|
|
base_name = os.path.splitext(os.path.basename(file_path))[0]
|
|
table_name = sanitize_table_name(base_name)
|
|
|
|
# Handle duplicate table names
|
|
original_table_name = table_name
|
|
counter = 1
|
|
while table_name in table_map.values():
|
|
table_name = f"{original_table_name}_{counter}"
|
|
counter += 1
|
|
|
|
try:
|
|
con.execute(
|
|
f"""
|
|
CREATE TABLE "{table_name}" AS
|
|
SELECT * FROM read_csv_auto('{file_path}')
|
|
"""
|
|
)
|
|
table_map[base_name] = table_name
|
|
row_count = con.execute(f'SELECT COUNT(*) FROM "{table_name}"').fetchone()[0]
|
|
logger.info(
|
|
f" Loaded CSV '{base_name}' -> table '{table_name}' ({row_count} rows)"
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f" Failed to load CSV '{base_name}': {e}")
|
|
|
|
|
|
def action_inspect(con: duckdb.DuckDBPyConnection, table_map: dict[str, str]) -> str:
|
|
"""Inspect the schema of all loaded tables."""
|
|
output_parts = []
|
|
|
|
for original_name, table_name in table_map.items():
|
|
output_parts.append(f"\n{'=' * 60}")
|
|
output_parts.append(f'Table: {original_name} (SQL name: "{table_name}")')
|
|
output_parts.append(f"{'=' * 60}")
|
|
|
|
# Get row count
|
|
row_count = con.execute(f'SELECT COUNT(*) FROM "{table_name}"').fetchone()[0]
|
|
output_parts.append(f"Rows: {row_count}")
|
|
|
|
# Get column info
|
|
columns = con.execute(f'DESCRIBE "{table_name}"').fetchall()
|
|
output_parts.append(f"\nColumns ({len(columns)}):")
|
|
output_parts.append(f"{'Name':<30} {'Type':<15} {'Nullable'}")
|
|
output_parts.append(f"{'-' * 30} {'-' * 15} {'-' * 8}")
|
|
for col in columns:
|
|
col_name, col_type, nullable = col[0], col[1], col[2]
|
|
output_parts.append(f"{col_name:<30} {col_type:<15} {nullable}")
|
|
|
|
# Get non-null counts per column
|
|
col_names = [col[0] for col in columns]
|
|
non_null_parts = []
|
|
for c in col_names:
|
|
non_null_parts.append(f'COUNT("{c}") as "{c}"')
|
|
non_null_sql = f'SELECT {", ".join(non_null_parts)} FROM "{table_name}"'
|
|
try:
|
|
non_null_counts = con.execute(non_null_sql).fetchone()
|
|
output_parts.append(f"\nNon-null counts:")
|
|
for i, c in enumerate(col_names):
|
|
output_parts.append(f" {c}: {non_null_counts[i]} / {row_count}")
|
|
except Exception:
|
|
pass
|
|
|
|
# Sample data (first 5 rows)
|
|
output_parts.append(f"\nSample data (first 5 rows):")
|
|
try:
|
|
sample = con.execute(f'SELECT * FROM "{table_name}" LIMIT 5').fetchdf()
|
|
output_parts.append(sample.to_string(index=False))
|
|
except Exception:
|
|
sample = con.execute(f'SELECT * FROM "{table_name}" LIMIT 5').fetchall()
|
|
header = [col[0] for col in columns]
|
|
output_parts.append(" " + " | ".join(header))
|
|
for row in sample:
|
|
output_parts.append(" " + " | ".join(str(v) for v in row))
|
|
|
|
result = "\n".join(output_parts)
|
|
print(result)
|
|
return result
|
|
|
|
|
|
def action_query(
|
|
con: duckdb.DuckDBPyConnection,
|
|
sql: str,
|
|
table_map: dict[str, str],
|
|
output_file: str | None = None,
|
|
) -> str:
|
|
"""Execute a SQL query and return/export results."""
|
|
# Replace original sheet/file names with sanitized table names in SQL
|
|
modified_sql = sql
|
|
for original_name, table_name in sorted(
|
|
table_map.items(), key=lambda x: len(x[0]), reverse=True
|
|
):
|
|
if original_name != table_name:
|
|
# Replace occurrences not already quoted
|
|
modified_sql = re.sub(
|
|
rf"\b{re.escape(original_name)}\b",
|
|
f'"{table_name}"',
|
|
modified_sql,
|
|
)
|
|
|
|
try:
|
|
result = con.execute(modified_sql)
|
|
columns = [desc[0] for desc in result.description]
|
|
rows = result.fetchall()
|
|
except Exception as e:
|
|
error_msg = f"SQL Error: {e}\n\nAvailable tables:\n"
|
|
for orig, tbl in table_map.items():
|
|
cols = con.execute(f'DESCRIBE "{tbl}"').fetchall()
|
|
col_names = [c[0] for c in cols]
|
|
error_msg += f' "{tbl}" ({orig}): {", ".join(col_names)}\n'
|
|
print(error_msg)
|
|
return error_msg
|
|
|
|
# Format output
|
|
if output_file:
|
|
return _export_results(columns, rows, output_file)
|
|
|
|
# Print as table
|
|
return _format_table(columns, rows)
|
|
|
|
|
|
def _format_table(columns: list[str], rows: list[tuple]) -> str:
|
|
"""Format query results as a readable table."""
|
|
if not rows:
|
|
msg = "Query returned 0 rows."
|
|
print(msg)
|
|
return msg
|
|
|
|
# Calculate column widths
|
|
col_widths = [len(str(c)) for c in columns]
|
|
for row in rows:
|
|
for i, val in enumerate(row):
|
|
col_widths[i] = max(col_widths[i], len(str(val)))
|
|
|
|
# Cap column width
|
|
max_width = 40
|
|
col_widths = [min(w, max_width) for w in col_widths]
|
|
|
|
# Build table
|
|
parts = []
|
|
header = " | ".join(str(c).ljust(col_widths[i]) for i, c in enumerate(columns))
|
|
separator = "-+-".join("-" * col_widths[i] for i in range(len(columns)))
|
|
parts.append(header)
|
|
parts.append(separator)
|
|
for row in rows:
|
|
row_str = " | ".join(
|
|
str(v)[:max_width].ljust(col_widths[i]) for i, v in enumerate(row)
|
|
)
|
|
parts.append(row_str)
|
|
|
|
parts.append(f"\n({len(rows)} rows)")
|
|
result = "\n".join(parts)
|
|
print(result)
|
|
return result
|
|
|
|
|
|
def _export_results(columns: list[str], rows: list[tuple], output_file: str) -> str:
|
|
"""Export query results to a file (CSV, JSON, or Markdown)."""
|
|
os.makedirs(os.path.dirname(output_file), exist_ok=True)
|
|
ext = os.path.splitext(output_file)[1].lower()
|
|
|
|
if ext == ".csv":
|
|
import csv
|
|
|
|
with open(output_file, "w", newline="", encoding="utf-8") as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow(columns)
|
|
writer.writerows(rows)
|
|
|
|
elif ext == ".json":
|
|
records = []
|
|
for row in rows:
|
|
record = {}
|
|
for i, col in enumerate(columns):
|
|
val = row[i]
|
|
# Handle non-JSON-serializable types
|
|
if hasattr(val, "isoformat"):
|
|
val = val.isoformat()
|
|
elif isinstance(val, (bytes, bytearray)):
|
|
val = val.hex()
|
|
record[col] = val
|
|
records.append(record)
|
|
with open(output_file, "w", encoding="utf-8") as f:
|
|
json.dump(records, f, indent=2, ensure_ascii=False, default=str)
|
|
|
|
elif ext == ".md":
|
|
with open(output_file, "w", encoding="utf-8") as f:
|
|
# Header
|
|
f.write("| " + " | ".join(columns) + " |\n")
|
|
f.write("| " + " | ".join("---" for _ in columns) + " |\n")
|
|
# Rows
|
|
for row in rows:
|
|
f.write(
|
|
"| " + " | ".join(str(v).replace("|", "\\|") for v in row) + " |\n"
|
|
)
|
|
else:
|
|
msg = f"Unsupported output format: {ext}. Use .csv, .json, or .md"
|
|
print(msg)
|
|
return msg
|
|
|
|
msg = f"Results exported to {output_file} ({len(rows)} rows)"
|
|
print(msg)
|
|
return msg
|
|
|
|
|
|
def action_summary(
|
|
con: duckdb.DuckDBPyConnection,
|
|
table_name: str,
|
|
table_map: dict[str, str],
|
|
) -> str:
|
|
"""Generate statistical summary for a table."""
|
|
# Resolve table name
|
|
resolved = table_map.get(table_name, table_name)
|
|
|
|
try:
|
|
columns = con.execute(f'DESCRIBE "{resolved}"').fetchall()
|
|
except Exception:
|
|
available = ", ".join(f'"{t}" ({o})' for o, t in table_map.items())
|
|
msg = f"Table '{table_name}' not found. Available tables: {available}"
|
|
print(msg)
|
|
return msg
|
|
|
|
row_count = con.execute(f'SELECT COUNT(*) FROM "{resolved}"').fetchone()[0]
|
|
|
|
output_parts = []
|
|
output_parts.append(f"\nStatistical Summary: {table_name}")
|
|
output_parts.append(f"Total rows: {row_count}")
|
|
output_parts.append(f"{'=' * 70}")
|
|
|
|
numeric_types = {
|
|
"BIGINT",
|
|
"INTEGER",
|
|
"SMALLINT",
|
|
"TINYINT",
|
|
"DOUBLE",
|
|
"FLOAT",
|
|
"DECIMAL",
|
|
"HUGEINT",
|
|
"REAL",
|
|
"NUMERIC",
|
|
}
|
|
|
|
for col in columns:
|
|
col_name, col_type = col[0], col[1].upper()
|
|
output_parts.append(f"\n--- {col_name} ({col[1]}) ---")
|
|
|
|
# Check base type (strip parameterized parts)
|
|
base_type = re.sub(r"\(.*\)", "", col_type).strip()
|
|
|
|
if base_type in numeric_types:
|
|
try:
|
|
stats = con.execute(f"""
|
|
SELECT
|
|
COUNT("{col_name}") as count,
|
|
AVG("{col_name}")::DOUBLE as mean,
|
|
STDDEV("{col_name}")::DOUBLE as std,
|
|
MIN("{col_name}") as min,
|
|
QUANTILE_CONT("{col_name}", 0.25) as q25,
|
|
MEDIAN("{col_name}") as median,
|
|
QUANTILE_CONT("{col_name}", 0.75) as q75,
|
|
MAX("{col_name}") as max,
|
|
COUNT(*) - COUNT("{col_name}") as null_count
|
|
FROM "{resolved}"
|
|
""").fetchone()
|
|
labels = [
|
|
"count",
|
|
"mean",
|
|
"std",
|
|
"min",
|
|
"25%",
|
|
"50%",
|
|
"75%",
|
|
"max",
|
|
"nulls",
|
|
]
|
|
for label, val in zip(labels, stats):
|
|
if isinstance(val, float):
|
|
output_parts.append(f" {label:<8}: {val:,.4f}")
|
|
else:
|
|
output_parts.append(f" {label:<8}: {val}")
|
|
except Exception as e:
|
|
output_parts.append(f" Error computing stats: {e}")
|
|
else:
|
|
try:
|
|
stats = con.execute(f"""
|
|
SELECT
|
|
COUNT("{col_name}") as count,
|
|
COUNT(DISTINCT "{col_name}") as unique_count,
|
|
MODE("{col_name}") as mode_val,
|
|
COUNT(*) - COUNT("{col_name}") as null_count
|
|
FROM "{resolved}"
|
|
""").fetchone()
|
|
output_parts.append(f" count : {stats[0]}")
|
|
output_parts.append(f" unique : {stats[1]}")
|
|
output_parts.append(f" top : {stats[2]}")
|
|
output_parts.append(f" nulls : {stats[3]}")
|
|
|
|
# Show top 5 values
|
|
top_vals = con.execute(f"""
|
|
SELECT "{col_name}", COUNT(*) as freq
|
|
FROM "{resolved}"
|
|
WHERE "{col_name}" IS NOT NULL
|
|
GROUP BY "{col_name}"
|
|
ORDER BY freq DESC
|
|
LIMIT 5
|
|
""").fetchall()
|
|
if top_vals:
|
|
output_parts.append(f" top values:")
|
|
for val, freq in top_vals:
|
|
pct = (freq / row_count * 100) if row_count > 0 else 0
|
|
output_parts.append(f" {val}: {freq} ({pct:.1f}%)")
|
|
except Exception as e:
|
|
output_parts.append(f" Error computing stats: {e}")
|
|
|
|
result = "\n".join(output_parts)
|
|
print(result)
|
|
return result
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Analyze Excel/CSV files using DuckDB")
|
|
parser.add_argument(
|
|
"--files",
|
|
nargs="+",
|
|
required=True,
|
|
help="Paths to Excel (.xlsx/.xls) or CSV files",
|
|
)
|
|
parser.add_argument(
|
|
"--action",
|
|
required=True,
|
|
choices=["inspect", "query", "summary"],
|
|
help="Action to perform: inspect, query, or summary",
|
|
)
|
|
parser.add_argument(
|
|
"--sql",
|
|
type=str,
|
|
default=None,
|
|
help="SQL query to execute (required for 'query' action)",
|
|
)
|
|
parser.add_argument(
|
|
"--table",
|
|
type=str,
|
|
default=None,
|
|
help="Table name for summary (required for 'summary' action)",
|
|
)
|
|
parser.add_argument(
|
|
"--output-file",
|
|
type=str,
|
|
default=None,
|
|
help="Path to export results (CSV/JSON/MD)",
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
# Validate arguments
|
|
if args.action == "query" and not args.sql:
|
|
parser.error("--sql is required for 'query' action")
|
|
if args.action == "summary" and not args.table:
|
|
parser.error("--table is required for 'summary' action")
|
|
|
|
# Compute file hash for caching
|
|
files_hash = compute_files_hash(args.files)
|
|
db_path = get_cache_db_path(files_hash)
|
|
cached_table_map = load_table_map(files_hash)
|
|
|
|
if cached_table_map and os.path.exists(db_path):
|
|
# Cache hit: connect to existing DB
|
|
logger.info(f"Cache hit! Using cached database: {db_path}")
|
|
con = duckdb.connect(db_path, read_only=True)
|
|
table_map = cached_table_map
|
|
logger.info(
|
|
f"Loaded {len(table_map)} table(s) from cache: {', '.join(table_map.keys())}"
|
|
)
|
|
else:
|
|
# Cache miss: load files and persist to DB
|
|
logger.info("Loading files (first time, will cache for future use)...")
|
|
con = duckdb.connect(db_path)
|
|
table_map = load_files(con, args.files)
|
|
|
|
if not table_map:
|
|
logger.error("No tables were loaded. Check file paths and formats.")
|
|
# Clean up empty DB file
|
|
con.close()
|
|
if os.path.exists(db_path):
|
|
os.remove(db_path)
|
|
sys.exit(1)
|
|
|
|
# Save table map for future cache lookups
|
|
save_table_map(files_hash, table_map)
|
|
logger.info(
|
|
f"\nLoaded {len(table_map)} table(s): {', '.join(table_map.keys())}"
|
|
)
|
|
logger.info(f"Cached database saved to: {db_path}")
|
|
|
|
# Perform action
|
|
if args.action == "inspect":
|
|
action_inspect(con, table_map)
|
|
elif args.action == "query":
|
|
action_query(con, args.sql, table_map, args.output_file)
|
|
elif args.action == "summary":
|
|
action_summary(con, args.table, table_map)
|
|
|
|
con.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|