Cursor rules for Snowpark Python (DataFrames, UDFs, stored procedures) and dbt with the Snowflake adapter.
.cursorrules or .cursor/rules/snowflake-snowpark-dbt.mdc // Snowflake Snowpark Python & dbt
// Expert guidance for Snowpark Python development and dbt with the Snowflake adapter
You are an expert in Snowpark Python (Snowflake's server-side Python API) and dbt with the dbt-snowflake adapter. You build production-grade data transformation pipelines using both tools.
// ═══════════════════════════════════════════
// SNOWPARK PYTHON
// ═══════════════════════════════════════════
// Snowpark runs Python server-side in Snowflake warehouses. Data never leaves Snowflake.
// Core abstractions: Session, DataFrame, UDF, UDTF, UDAF, Stored Procedure.
// Session
from snowflake.snowpark import Session
import os
session = Session.builder.configs({
"account": os.environ["SNOWFLAKE_ACCOUNT"],
"user": os.environ["SNOWFLAKE_USER"],
"password": os.environ["SNOWFLAKE_PASSWORD"],
"role": "my_role", "warehouse": "my_wh", "database": "my_db", "schema": "my_schema"
}).create()
// DataFrame API — Lazy evaluation, builds query plan executed on collect()/show().
df = session.table("customers")
df_filtered = df.filter(df["region"] == "US").select("name", "email", "revenue")
df_agg = df.group_by("region").agg(sum("revenue").alias("total_revenue"))
df_agg.show()
// Key operations: .filter(), .select(), .group_by().agg(), .join(), .sort(),
// .with_column(), .drop(), .distinct(), .limit(), .union_all(), .flatten(),
// .write.save_as_table()
// Scalar UDFs
from snowflake.snowpark.functions import udf
@udf(name="normalize_email", replace=True)
def normalize_email(email: str) -> str:
return email.strip().lower() if email else None
// Vectorized UDFs (10-100x faster for ML inference):
import pandas as pd
@udf(name="predict_score", packages=["scikit-learn", "pandas"], replace=True)
def predict_score(features: pd.Series) -> pd.Series:
import pickle, sys
model = pickle.load(open(sys.path[0] + "/model.pkl", "rb"))
return pd.Series(model.predict(features.values.reshape(-1, 1)))
// UDTFs (return multiple rows per input):
class Tokenizer:
def process(self, text: str):
for token in text.split():
yield (token,)
tokenize = session.udtf.register(Tokenizer,
output_schema=StructType([StructField("token", StringType())]),
input_types=[StringType()], name="tokenize", replace=True)
// Stored Procedures (server-side multi-step logic):
from snowflake.snowpark.functions import sproc
@sproc(name="daily_etl", replace=True, packages=["snowflake-snowpark-python"])
def daily_etl(session: Session) -> str:
raw = session.table("raw_events")
cleaned = raw.filter(raw["event_type"].is_not_null())
cleaned.write.mode("overwrite").save_as_table("cleaned_events")
return f"Processed {cleaned.count()} rows"
// Third-Party Packages: session.add_packages("pandas", "scikit-learn==1.3.0", "xgboost")
// File Access: session.add_import("@my_stage/model.pkl") for static files.
// pandas on Snowflake (no data movement):
// import modin.pandas as pd; import snowflake.snowpark.modin.plugin
// df = pd.read_snowflake("my_table")
// ═══════════════════════════════════════════
// DBT WITH SNOWFLAKE ADAPTER
// ═══════════════════════════════════════════
// Install: pip install dbt-snowflake
// profiles.yml:
my_project:
target: dev
outputs:
dev:
type: snowflake
account: myaccount
user: myuser
password: "{{ env_var('SNOWFLAKE_PASSWORD') }}"
role: transformer
database: analytics
warehouse: transforming
schema: public
threads: 4
// Materializations: view, table, incremental, ephemeral, dynamic_table
// Dynamic Tables in dbt:
// {{ config(materialized='dynamic_table', snowflake_warehouse='transforming', target_lag='1 hour') }}
// SELECT customer_id, SUM(amount) AS lifetime_value FROM {{ ref('stg_orders') }} GROUP BY 1
// Incremental Models:
{{
config(
materialized='incremental',
unique_key='event_id',
incremental_strategy='merge',
on_schema_change='sync_all_columns'
)
}}
SELECT * FROM {{ ref('stg_events') }}
{% if is_incremental() %}
WHERE event_timestamp > (SELECT MAX(event_timestamp) FROM {{ this }})
{% endif %}
// Snowflake-Specific Configs:
// cluster_by=['col1', 'col2'] — Clustering (large tables only)
// transient=true — No Fail-safe (lower storage cost)
// query_tag='finance_daily' — Workload attribution
// copy_grants=true — Preserve access on replace
// snowflake_warehouse='lg_wh' — Per-model warehouse override
// secure=true — Secure views
// Sources (models/staging/_sources.yml):
sources:
- name: raw
database: raw_db
schema: jaffle_shop
tables:
- name: customers
loaded_at_field: _loaded_at
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
// Testing (schema.yml):
models:
- name: stg_customers
columns:
- name: customer_id
tests: [unique, not_null]
// Key Commands:
// dbt run, dbt test, dbt build (run+test in order), dbt compile
// dbt run --select my_model+ (model + downstream)
// dbt run --select +my_model (model + upstream)
// dbt source freshness, dbt docs generate && dbt docs serve
// Custom schema macro (macros/generate_schema_name.sql):
{% macro generate_schema_name(custom_schema_name, node) %}
{% if custom_schema_name %}{{ custom_schema_name | trim }}{% else %}{{ target.schema }}{% endif %}
{% endmacro %}
// Best Practices
- Prefer vectorized UDFs (pandas) for ML inference — much faster than scalar UDFs.
- Pin package versions in production UDFs and stored procedures.
- Use DataFrame API over raw SQL strings in reusable Python pipelines.
- Use staging models (stg_*) to rename/type-cast, mart models for business tables.
- Use incremental for fact tables; dynamic_table for near-real-time.
- Set on_schema_change='sync_all_columns' on incremental models.
- Use copy_grants=true to avoid permission issues. Tag models for selective execution.
- Use separate warehouses for dbt runs vs analyst queries.
// Anti-Patterns
- Do NOT collect() large DataFrames to client — process server-side.
- Do NOT use Python loops over rows — use DataFrame operations or vectorized UDFs.
- Do NOT use {{ this }} without {% if is_incremental() %} guard.
- Do NOT set cluster_by on small tables (< 1TB).
- Do NOT use materialized='table' for everything — views are free.
- Do NOT hardcode database/schema — use {{ ref() }} and {{ source() }}. // Snowflake Snowpark Python & dbt // Expert guidance for Snowpark Python development and dbt with the Snowflake adapter
You are an expert in Snowpark Python (Snowflake’s server-side Python API) and dbt with the dbt-snowflake adapter. You build production-grade data transformation pipelines using both tools.
// ═══════════════════════════════════════════ // SNOWPARK PYTHON // ═══════════════════════════════════════════
// Snowpark runs Python server-side in Snowflake warehouses. Data never leaves Snowflake. // Core abstractions: Session, DataFrame, UDF, UDTF, UDAF, Stored Procedure.
// Session from snowflake.snowpark import Session import os session = Session.builder.configs({ “account”: os.environ[“SNOWFLAKE_ACCOUNT”], “user”: os.environ[“SNOWFLAKE_USER”], “password”: os.environ[“SNOWFLAKE_PASSWORD”], “role”: “my_role”, “warehouse”: “my_wh”, “database”: “my_db”, “schema”: “my_schema” }).create()
// DataFrame API — Lazy evaluation, builds query plan executed on collect()/show(). df = session.table(“customers”) df_filtered = df.filter(df[“region”] == “US”).select(“name”, “email”, “revenue”) df_agg = df.group_by(“region”).agg(sum(“revenue”).alias(“total_revenue”)) df_agg.show()
// Key operations: .filter(), .select(), .group_by().agg(), .join(), .sort(), // .with_column(), .drop(), .distinct(), .limit(), .union_all(), .flatten(), // .write.save_as_table()
// Scalar UDFs from snowflake.snowpark.functions import udf @udf(name=“normalize_email”, replace=True) def normalize_email(email: str) -> str: return email.strip().lower() if email else None
// Vectorized UDFs (10-100x faster for ML inference): import pandas as pd @udf(name=“predict_score”, packages=[“scikit-learn”, “pandas”], replace=True) def predict_score(features: pd.Series) -> pd.Series: import pickle, sys model = pickle.load(open(sys.path[0] + “/model.pkl”, “rb”)) return pd.Series(model.predict(features.values.reshape(-1, 1)))
// UDTFs (return multiple rows per input): class Tokenizer: def process(self, text: str): for token in text.split(): yield (token,)
tokenize = session.udtf.register(Tokenizer, output_schema=StructType([StructField(“token”, StringType())]), input_types=[StringType()], name=“tokenize”, replace=True)
// Stored Procedures (server-side multi-step logic): from snowflake.snowpark.functions import sproc @sproc(name=“daily_etl”, replace=True, packages=[“snowflake-snowpark-python”]) def daily_etl(session: Session) -> str: raw = session.table(“raw_events”) cleaned = raw.filter(raw[“event_type”].is_not_null()) cleaned.write.mode(“overwrite”).save_as_table(“cleaned_events”) return f”Processed {cleaned.count()} rows”
// Third-Party Packages: session.add_packages(“pandas”, “scikit-learn==1.3.0”, “xgboost”) // File Access: session.add_import(“@my_stage/model.pkl”) for static files. // pandas on Snowflake (no data movement): // import modin.pandas as pd; import snowflake.snowpark.modin.plugin // df = pd.read_snowflake(“my_table”)
// ═══════════════════════════════════════════ // DBT WITH SNOWFLAKE ADAPTER // ═══════════════════════════════════════════
// Install: pip install dbt-snowflake // profiles.yml: my_project: target: dev outputs: dev: type: snowflake account: myaccount user: myuser password: ”{{ env_var(‘SNOWFLAKE_PASSWORD’) }}” role: transformer database: analytics warehouse: transforming schema: public threads: 4
// Materializations: view, table, incremental, ephemeral, dynamic_table
// Dynamic Tables in dbt: // {{ config(materialized=‘dynamic_table’, snowflake_warehouse=‘transforming’, target_lag=‘1 hour’) }} // SELECT customer_id, SUM(amount) AS lifetime_value FROM {{ ref(‘stg_orders’) }} GROUP BY 1
// Incremental Models: {{ config( materialized=‘incremental’, unique_key=‘event_id’, incremental_strategy=‘merge’, on_schema_change=‘sync_all_columns’ ) }} SELECT * FROM {{ ref(‘stg_events’) }} {% if is_incremental() %} WHERE event_timestamp > (SELECT MAX(event_timestamp) FROM {{ this }}) {% endif %}
// Snowflake-Specific Configs: // cluster_by=[‘col1’, ‘col2’] — Clustering (large tables only) // transient=true — No Fail-safe (lower storage cost) // query_tag=‘finance_daily’ — Workload attribution // copy_grants=true — Preserve access on replace // snowflake_warehouse=‘lg_wh’ — Per-model warehouse override // secure=true — Secure views
// Sources (models/staging/_sources.yml): sources:
// Testing (schema.yml): models:
// Key Commands: // dbt run, dbt test, dbt build (run+test in order), dbt compile // dbt run —select my_model+ (model + downstream) // dbt run —select +my_model (model + upstream) // dbt source freshness, dbt docs generate && dbt docs serve
// Custom schema macro (macros/generate_schema_name.sql): {% macro generate_schema_name(custom_schema_name, node) %} {% if custom_schema_name %}{{ custom_schema_name | trim }}{% else %}{{ target.schema }}{% endif %} {% endmacro %}
// Best Practices
// Anti-Patterns
Quantitative factor research skills for Cursor. Evaluate factors, run backtests, mine new alpha through natural language.
Prevent AI over-engineering by keeping changes scoped, simple, and directly tied to the user's request
Anti-sycophancy directives for code review and generation. Blocks hallucinated APIs, false confidence, authority-driven validation, and softening of real risk.
Cursor rules for Aspnet Abp.
Guidelines and best practices for building applications with [Beefree SDK](https://docs.beefree.io/beefree-sdk), including installation, authentication, configuration, customization, and template management
Cursor rules for embedding Beefree SDK's no-code content editors (for emails, pages, and popups) into a web application.