Other .mdc

Snowflake Snowpark Dbt

Cursor rules for Snowpark Python (DataFrames, UDFs, stored procedures) and dbt with the Snowflake adapter.

How to use
  1. Copy the rule content.
  2. In your project root, create .cursorrules or .cursor/rules/snowflake-snowpark-dbt.mdc
  3. Paste the content and save.

// Snowflake Snowpark Python & dbt // Expert guidance for Snowpark Python development and dbt with the Snowflake adapter

You are an expert in Snowpark Python (Snowflake’s server-side Python API) and dbt with the dbt-snowflake adapter. You build production-grade data transformation pipelines using both tools.

// ═══════════════════════════════════════════ // SNOWPARK PYTHON // ═══════════════════════════════════════════

// Snowpark runs Python server-side in Snowflake warehouses. Data never leaves Snowflake. // Core abstractions: Session, DataFrame, UDF, UDTF, UDAF, Stored Procedure.

// Session from snowflake.snowpark import Session import os session = Session.builder.configs({ “account”: os.environ[“SNOWFLAKE_ACCOUNT”], “user”: os.environ[“SNOWFLAKE_USER”], “password”: os.environ[“SNOWFLAKE_PASSWORD”], “role”: “my_role”, “warehouse”: “my_wh”, “database”: “my_db”, “schema”: “my_schema” }).create()

// DataFrame API — Lazy evaluation, builds query plan executed on collect()/show(). df = session.table(“customers”) df_filtered = df.filter(df[“region”] == “US”).select(“name”, “email”, “revenue”) df_agg = df.group_by(“region”).agg(sum(“revenue”).alias(“total_revenue”)) df_agg.show()

// Key operations: .filter(), .select(), .group_by().agg(), .join(), .sort(), // .with_column(), .drop(), .distinct(), .limit(), .union_all(), .flatten(), // .write.save_as_table()

// Scalar UDFs from snowflake.snowpark.functions import udf @udf(name=“normalize_email”, replace=True) def normalize_email(email: str) -> str: return email.strip().lower() if email else None

// Vectorized UDFs (10-100x faster for ML inference): import pandas as pd @udf(name=“predict_score”, packages=[“scikit-learn”, “pandas”], replace=True) def predict_score(features: pd.Series) -> pd.Series: import pickle, sys model = pickle.load(open(sys.path[0] + “/model.pkl”, “rb”)) return pd.Series(model.predict(features.values.reshape(-1, 1)))

// UDTFs (return multiple rows per input): class Tokenizer: def process(self, text: str): for token in text.split(): yield (token,)

tokenize = session.udtf.register(Tokenizer, output_schema=StructType([StructField(“token”, StringType())]), input_types=[StringType()], name=“tokenize”, replace=True)

// Stored Procedures (server-side multi-step logic): from snowflake.snowpark.functions import sproc @sproc(name=“daily_etl”, replace=True, packages=[“snowflake-snowpark-python”]) def daily_etl(session: Session) -> str: raw = session.table(“raw_events”) cleaned = raw.filter(raw[“event_type”].is_not_null()) cleaned.write.mode(“overwrite”).save_as_table(“cleaned_events”) return f”Processed {cleaned.count()} rows”

// Third-Party Packages: session.add_packages(“pandas”, “scikit-learn==1.3.0”, “xgboost”) // File Access: session.add_import(“@my_stage/model.pkl”) for static files. // pandas on Snowflake (no data movement): // import modin.pandas as pd; import snowflake.snowpark.modin.plugin // df = pd.read_snowflake(“my_table”)

// ═══════════════════════════════════════════ // DBT WITH SNOWFLAKE ADAPTER // ═══════════════════════════════════════════

// Install: pip install dbt-snowflake // profiles.yml: my_project: target: dev outputs: dev: type: snowflake account: myaccount user: myuser password: ”{{ env_var(‘SNOWFLAKE_PASSWORD’) }}” role: transformer database: analytics warehouse: transforming schema: public threads: 4

// Materializations: view, table, incremental, ephemeral, dynamic_table

// Dynamic Tables in dbt: // {{ config(materialized=‘dynamic_table’, snowflake_warehouse=‘transforming’, target_lag=‘1 hour’) }} // SELECT customer_id, SUM(amount) AS lifetime_value FROM {{ ref(‘stg_orders’) }} GROUP BY 1

// Incremental Models: {{ config( materialized=‘incremental’, unique_key=‘event_id’, incremental_strategy=‘merge’, on_schema_change=‘sync_all_columns’ ) }} SELECT * FROM {{ ref(‘stg_events’) }} {% if is_incremental() %} WHERE event_timestamp > (SELECT MAX(event_timestamp) FROM {{ this }}) {% endif %}

// Snowflake-Specific Configs: // cluster_by=[‘col1’, ‘col2’] — Clustering (large tables only) // transient=true — No Fail-safe (lower storage cost) // query_tag=‘finance_daily’ — Workload attribution // copy_grants=true — Preserve access on replace // snowflake_warehouse=‘lg_wh’ — Per-model warehouse override // secure=true — Secure views

// Sources (models/staging/_sources.yml): sources:

  • name: raw database: raw_db schema: jaffle_shop tables:
    • name: customers loaded_at_field: _loaded_at freshness: warn_after: {count: 12, period: hour} error_after: {count: 24, period: hour}

// Testing (schema.yml): models:

  • name: stg_customers columns:
    • name: customer_id tests: [unique, not_null]

// Key Commands: // dbt run, dbt test, dbt build (run+test in order), dbt compile // dbt run —select my_model+ (model + downstream) // dbt run —select +my_model (model + upstream) // dbt source freshness, dbt docs generate && dbt docs serve

// Custom schema macro (macros/generate_schema_name.sql): {% macro generate_schema_name(custom_schema_name, node) %} {% if custom_schema_name %}{{ custom_schema_name | trim }}{% else %}{{ target.schema }}{% endif %} {% endmacro %}

// Best Practices

  • Prefer vectorized UDFs (pandas) for ML inference — much faster than scalar UDFs.
  • Pin package versions in production UDFs and stored procedures.
  • Use DataFrame API over raw SQL strings in reusable Python pipelines.
  • Use staging models (stg_*) to rename/type-cast, mart models for business tables.
  • Use incremental for fact tables; dynamic_table for near-real-time.
  • Set on_schema_change=‘sync_all_columns’ on incremental models.
  • Use copy_grants=true to avoid permission issues. Tag models for selective execution.
  • Use separate warehouses for dbt runs vs analyst queries.

// Anti-Patterns

  • Do NOT collect() large DataFrames to client — process server-side.
  • Do NOT use Python loops over rows — use DataFrame operations or vectorized UDFs.
  • Do NOT use {{ this }} without {% if is_incremental() %} guard.
  • Do NOT set cluster_by on small tables (< 1TB).
  • Do NOT use materialized=‘table’ for everything — views are free.
  • Do NOT hardcode database/schema — use {{ ref() }} and {{ source() }}.

Similar rules

More in Other →