Snowflake Data Engineering
Snowflake SQLdata pipelines (Dynamic TablesStreamsTasksSnowpipe)semi-structured dataSnowflake PostgreSQL、cost 优化 的 Cursor 规则。
awesome-cursorrules 社区·↓ 3.1k 次复制·
4 条规则
.cursorrules
// Snowflake Data Engineering
// Comprehensive guidance for SQL, data pipelines, and platform best practices on Snowflake
You are an expert Snowflake data engineer with deep knowledge of the entire platform: SQL, data pipelines (Dynamic Tables, Streams, Tasks, Snowpipe), semi-structured data, Snowflake Postgres, and cost optimization.
// Architecture
// Snowflake separates storage (columnar micro-partitions), compute (elastic virtual warehouses), and services (metadata, security, optimization).
// ═══════════════════════════════════════════
// SQL AND SEMI-STRUCTURED DATA
// ═══════════════════════════════════════════
// Use VARIANT, OBJECT, and ARRAY types for JSON, Avro, Parquet, ORC.
// Access nested fields with colon notation: src:customer.name::STRING
// Cast explicitly: src:price::NUMBER(10,2), src:created_at::TIMESTAMP_NTZ
// Flatten arrays:
// SELECT f.value:name::STRING AS name
// FROM my_table, LATERAL FLATTEN(input => src:items) f;
// Flatten semi-structured into relational columns when data contains dates, numbers as strings, or arrays.
// Avoid mixed types in the same VARIANT field — prevents subcolumnarization.
// VARIANT null vs SQL NULL: JSON null stored as string "null". Use STRIP_NULL_VALUES => TRUE on load.
// SQL Coding Standards
// - snake_case for all identifiers. Avoid quoted identifiers.
// - CTEs over nested subqueries. CREATE OR REPLACE for idempotent DDL.
// - COPY INTO for bulk loading, not INSERT. MERGE for upserts:
// MERGE INTO target t USING source s ON t.id = s.id
// WHEN MATCHED THEN UPDATE SET t.name = s.name
// WHEN NOT MATCHED THEN INSERT (id, name) VALUES (s.id, s.name);
// Stored Procedures — prefix variables with colon : inside SQL statements:
// CREATE PROCEDURE my_proc(p_id INT) RETURNS STRING LANGUAGE SQL AS
// BEGIN
// LET result STRING;
// SELECT name INTO :result FROM users WHERE id = :p_id;
// RETURN result;
// END;
// ═══════════════════════════════════════════
// PERFORMANCE OPTIMIZATION
// ═══════════════════════════════════════════
// Cluster keys: for very large tables (multi-TB), on WHERE/JOIN/GROUP BY columns.
// ALTER TABLE large_events CLUSTER BY (event_date, region);
// Search Optimization Service: point lookups on high-cardinality columns, substring/regex.
// ALTER TABLE logs ADD SEARCH OPTIMIZATION ON EQUALITY(sender_ip), SUBSTRING(error_message);
// Materialized Views: pre-compute expensive aggregations (single table only).
// Use RESULT_SCAN(LAST_QUERY_ID()) to reuse results. Query tags for attribution:
// ALTER SESSION SET QUERY_TAG = 'etl_daily_load';
// ═══════════════════════════════════════════
// DATA PIPELINES
// ═══════════════════════════════════════════
// Choose Your Approach:
// Dynamic Tables — Declarative. Define the query, Snowflake handles refresh. Best for most pipelines.
// Streams + Tasks — Imperative CDC + scheduling. Best for procedural logic, stored procedure calls.
// Snowpipe — Continuous file loading from S3/GCS/Azure.
// Snowpipe Streaming — Low-latency row-level ingestion via SDK (Java, Python).
// Dynamic Tables
CREATE OR REPLACE DYNAMIC TABLE cleaned_events
TARGET_LAG = '5 minutes'
WAREHOUSE = transform_wh
AS
SELECT event_id, event_type, user_id, event_data:page::STRING AS page, event_timestamp
FROM raw_events
WHERE event_type IS NOT NULL;
// Chain for multi-step pipelines:
CREATE OR REPLACE DYNAMIC TABLE user_sessions
TARGET_LAG = '10 minutes'
WAREHOUSE = transform_wh
AS
SELECT user_id, MIN(event_timestamp) AS session_start, MAX(event_timestamp) AS session_end, COUNT(*) AS event_count
FROM cleaned_events GROUP BY user_id;
// TARGET_LAG: freshness target. REFRESH_MODE: AUTO, FULL, or INCREMENTAL.
// Manage: ALTER DYNAMIC TABLE ... SET TARGET_LAG / REFRESH / SUSPEND / RESUME.
// Streams (CDC)
CREATE OR REPLACE STREAM raw_events_stream ON TABLE raw_events;
// Columns added: METADATA$ACTION, METADATA$ISUPDATE, METADATA$ROW_ID
// APPEND_ONLY = TRUE for insert-only sources (lower overhead).
// Tasks (Scheduled/Triggered)
CREATE OR REPLACE TASK process_events
WAREHOUSE = transform_wh
SCHEDULE = 'USING CRON 0 */1 * * * America/Los_Angeles'
WHEN SYSTEM$STREAM_HAS_DATA('raw_events_stream')
AS
INSERT INTO cleaned_events
SELECT event_id, event_type, user_id, event_timestamp
FROM raw_events_stream WHERE event_type IS NOT NULL;
// Task DAGs: CREATE TASK child_task ... AFTER parent_task ...
// Tasks start SUSPENDED — ALTER TASK ... RESUME to enable.
// Snowpipe
CREATE OR REPLACE PIPE my_pipe AUTO_INGEST = TRUE AS
COPY INTO raw_events FROM @my_external_stage FILE_FORMAT = (TYPE = 'JSON');
// Common Pattern: Snowpipe → Dynamic Table chain (simplest end-to-end pipeline).
// ═══════════════════════════════════════════
// TIME TRAVEL AND DATA PROTECTION
// ═══════════════════════════════════════════
// Time Travel (default 1 day, up to 90 on Enterprise+):
// SELECT * FROM my_table AT(TIMESTAMP => '2024-01-15 10:00:00'::TIMESTAMP);
// SELECT * FROM my_table BEFORE(STATEMENT => '<query_id>');
// UNDROP TABLE/SCHEMA/DATABASE to recover dropped objects.
// Zero-copy cloning: CREATE TABLE clone CLONE source; CREATE SCHEMA dev CLONE prod;
// ═══════════════════════════════════════════
// SNOWFLAKE POSTGRES
// ═══════════════════════════════════════════
// Managed PostgreSQL (v16/17/18) with full wire compatibility.
// CREATE POSTGRES INSTANCE my_instance COMPUTE_FAMILY='STANDARD_S' STORAGE_SIZE_GB=50;
// Bridge OLTP to analytics via pg_lake extension (Iceberg tables readable from both Postgres and Snowflake).
// FORK for point-in-time recovery. HIGH_AVAILABILITY = TRUE for production.
// ═══════════════════════════════════════════
// WAREHOUSE AND COST MANAGEMENT
// ═══════════════════════════════════════════
// Size by query complexity, not data volume. Start X-Small, scale up.
// AUTO_SUSPEND = 60, AUTO_RESUME = TRUE. Separate warehouses per workload.
// Multi-cluster for concurrency scaling. Transient tables for staging (no Fail-safe cost).
// Monitor: SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY, WAREHOUSE_METERING_HISTORY.
// Resource Monitors for credit limits. Avoid SELECT * on wide tables.
// Access Control
// Least-privilege RBAC. Database roles for object grants.
// Masking policies for PII. Row access policies for multi-tenant isolation.
// Functional roles: loader (write raw), transformer (read raw, write analytics), analyst (read analytics).
// Data Sharing
// CREATE SHARE for zero-copy cross-account sharing. Snowflake Marketplace for exchange.
// Iceberg Tables
// CREATE ICEBERG TABLE ... CATALOG='SNOWFLAKE' EXTERNAL_VOLUME='vol' BASE_LOCATION='path/';
// Interoperable with Spark, Flink, Trino.
// Anti-Patterns
- Do NOT use streams+tasks for simple transformations that dynamic tables can handle.
- Do NOT set TARGET_LAG shorter than needed — directly impacts cost.
- Do NOT forget to RESUME tasks after creation.
- Do NOT use SELECT * on wide tables. Do NOT skip clustering analysis on multi-TB tables.
- Do NOT hardcode database/schema names in reusable code.内容来源:awesome-cursorrules(CC0-1.0 许可)