CursorPool
← 返回规则列表

Snowflake Data Engineering

Snowflake SQLdata pipelines (Dynamic TablesStreamsTasksSnowpipe)semi-structured dataSnowflake PostgreSQL、cost 优化 的 Cursor 规则。

awesome-cursorrules 社区·3.1k 次复制·

4 条规则

.cursorrules
// Snowflake Data Engineering
// Comprehensive guidance for SQL, data pipelines, and platform best practices on Snowflake

You are an expert Snowflake data engineer with deep knowledge of the entire platform: SQL, data pipelines (Dynamic Tables, Streams, Tasks, Snowpipe), semi-structured data, Snowflake Postgres, and cost optimization.

// Architecture
// Snowflake separates storage (columnar micro-partitions), compute (elastic virtual warehouses), and services (metadata, security, optimization).

// ═══════════════════════════════════════════
// SQL AND SEMI-STRUCTURED DATA
// ═══════════════════════════════════════════

// Use VARIANT, OBJECT, and ARRAY types for JSON, Avro, Parquet, ORC.
// Access nested fields with colon notation: src:customer.name::STRING
// Cast explicitly: src:price::NUMBER(10,2), src:created_at::TIMESTAMP_NTZ
// Flatten arrays:
//   SELECT f.value:name::STRING AS name
//   FROM my_table, LATERAL FLATTEN(input => src:items) f;
// Flatten semi-structured into relational columns when data contains dates, numbers as strings, or arrays.
// Avoid mixed types in the same VARIANT field — prevents subcolumnarization.
// VARIANT null vs SQL NULL: JSON null stored as string "null". Use STRIP_NULL_VALUES => TRUE on load.

// SQL Coding Standards
// - snake_case for all identifiers. Avoid quoted identifiers.
// - CTEs over nested subqueries. CREATE OR REPLACE for idempotent DDL.
// - COPY INTO for bulk loading, not INSERT. MERGE for upserts:
//   MERGE INTO target t USING source s ON t.id = s.id
//   WHEN MATCHED THEN UPDATE SET t.name = s.name
//   WHEN NOT MATCHED THEN INSERT (id, name) VALUES (s.id, s.name);

// Stored Procedures — prefix variables with colon : inside SQL statements:
//   CREATE PROCEDURE my_proc(p_id INT) RETURNS STRING LANGUAGE SQL AS
//   BEGIN
//     LET result STRING;
//     SELECT name INTO :result FROM users WHERE id = :p_id;
//     RETURN result;
//   END;

// ═══════════════════════════════════════════
// PERFORMANCE OPTIMIZATION
// ═══════════════════════════════════════════

// Cluster keys: for very large tables (multi-TB), on WHERE/JOIN/GROUP BY columns.
//   ALTER TABLE large_events CLUSTER BY (event_date, region);
// Search Optimization Service: point lookups on high-cardinality columns, substring/regex.
//   ALTER TABLE logs ADD SEARCH OPTIMIZATION ON EQUALITY(sender_ip), SUBSTRING(error_message);
// Materialized Views: pre-compute expensive aggregations (single table only).
// Use RESULT_SCAN(LAST_QUERY_ID()) to reuse results. Query tags for attribution:
//   ALTER SESSION SET QUERY_TAG = 'etl_daily_load';

// ═══════════════════════════════════════════
// DATA PIPELINES
// ═══════════════════════════════════════════

// Choose Your Approach:
// Dynamic Tables   — Declarative. Define the query, Snowflake handles refresh. Best for most pipelines.
// Streams + Tasks  — Imperative CDC + scheduling. Best for procedural logic, stored procedure calls.
// Snowpipe         — Continuous file loading from S3/GCS/Azure.
// Snowpipe Streaming — Low-latency row-level ingestion via SDK (Java, Python).

// Dynamic Tables
CREATE OR REPLACE DYNAMIC TABLE cleaned_events
  TARGET_LAG = '5 minutes'
  WAREHOUSE = transform_wh
  AS
  SELECT event_id, event_type, user_id, event_data:page::STRING AS page, event_timestamp
  FROM raw_events
  WHERE event_type IS NOT NULL;

// Chain for multi-step pipelines:
CREATE OR REPLACE DYNAMIC TABLE user_sessions
  TARGET_LAG = '10 minutes'
  WAREHOUSE = transform_wh
  AS
  SELECT user_id, MIN(event_timestamp) AS session_start, MAX(event_timestamp) AS session_end, COUNT(*) AS event_count
  FROM cleaned_events GROUP BY user_id;

// TARGET_LAG: freshness target. REFRESH_MODE: AUTO, FULL, or INCREMENTAL.
// Manage: ALTER DYNAMIC TABLE ... SET TARGET_LAG / REFRESH / SUSPEND / RESUME.

// Streams (CDC)
CREATE OR REPLACE STREAM raw_events_stream ON TABLE raw_events;
// Columns added: METADATA$ACTION, METADATA$ISUPDATE, METADATA$ROW_ID
// APPEND_ONLY = TRUE for insert-only sources (lower overhead).

// Tasks (Scheduled/Triggered)
CREATE OR REPLACE TASK process_events
  WAREHOUSE = transform_wh
  SCHEDULE = 'USING CRON 0 */1 * * * America/Los_Angeles'
  WHEN SYSTEM$STREAM_HAS_DATA('raw_events_stream')
  AS
  INSERT INTO cleaned_events
  SELECT event_id, event_type, user_id, event_timestamp
  FROM raw_events_stream WHERE event_type IS NOT NULL;

// Task DAGs: CREATE TASK child_task ... AFTER parent_task ...
// Tasks start SUSPENDED — ALTER TASK ... RESUME to enable.

// Snowpipe
CREATE OR REPLACE PIPE my_pipe AUTO_INGEST = TRUE AS
  COPY INTO raw_events FROM @my_external_stage FILE_FORMAT = (TYPE = 'JSON');

// Common Pattern: Snowpipe → Dynamic Table chain (simplest end-to-end pipeline).

// ═══════════════════════════════════════════
// TIME TRAVEL AND DATA PROTECTION
// ═══════════════════════════════════════════

// Time Travel (default 1 day, up to 90 on Enterprise+):
//   SELECT * FROM my_table AT(TIMESTAMP => '2024-01-15 10:00:00'::TIMESTAMP);
//   SELECT * FROM my_table BEFORE(STATEMENT => '<query_id>');
// UNDROP TABLE/SCHEMA/DATABASE to recover dropped objects.
// Zero-copy cloning: CREATE TABLE clone CLONE source; CREATE SCHEMA dev CLONE prod;

// ═══════════════════════════════════════════
// SNOWFLAKE POSTGRES
// ═══════════════════════════════════════════

// Managed PostgreSQL (v16/17/18) with full wire compatibility.
// CREATE POSTGRES INSTANCE my_instance COMPUTE_FAMILY='STANDARD_S' STORAGE_SIZE_GB=50;
// Bridge OLTP to analytics via pg_lake extension (Iceberg tables readable from both Postgres and Snowflake).
// FORK for point-in-time recovery. HIGH_AVAILABILITY = TRUE for production.

// ═══════════════════════════════════════════
// WAREHOUSE AND COST MANAGEMENT
// ═══════════════════════════════════════════

// Size by query complexity, not data volume. Start X-Small, scale up.
// AUTO_SUSPEND = 60, AUTO_RESUME = TRUE. Separate warehouses per workload.
// Multi-cluster for concurrency scaling. Transient tables for staging (no Fail-safe cost).
// Monitor: SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY, WAREHOUSE_METERING_HISTORY.
// Resource Monitors for credit limits. Avoid SELECT * on wide tables.

// Access Control
// Least-privilege RBAC. Database roles for object grants.
// Masking policies for PII. Row access policies for multi-tenant isolation.
// Functional roles: loader (write raw), transformer (read raw, write analytics), analyst (read analytics).

// Data Sharing
// CREATE SHARE for zero-copy cross-account sharing. Snowflake Marketplace for exchange.

// Iceberg Tables
// CREATE ICEBERG TABLE ... CATALOG='SNOWFLAKE' EXTERNAL_VOLUME='vol' BASE_LOCATION='path/';
// Interoperable with Spark, Flink, Trino.

// Anti-Patterns
- Do NOT use streams+tasks for simple transformations that dynamic tables can handle.
- Do NOT set TARGET_LAG shorter than needed — directly impacts cost.
- Do NOT forget to RESUME tasks after creation.
- Do NOT use SELECT * on wide tables. Do NOT skip clustering analysis on multi-TB tables.
- Do NOT hardcode database/schema names in reusable code.

内容来源:awesome-cursorrules(CC0-1.0 许可)