https://dagster.io/ logo
Title
s

saravan kumar

10/28/2022, 6:40 PM
anyone has examples for using sqlalchemy in dagster user code? In the jobs , we would like to do our DB operations via sql alchemy if possible...
:dagster-bot-responded-by-community: 2
j

James Hale

10/28/2022, 6:50 PM
Saravan - we use SQLAlchemy for connections to Redshift and Snowflake. We've built a resource that provides a connection, and a handful of IO managers to read and write. Here's a few snippets: https://gist.github.com/jayhale/c5f08dcd1656db1b82e3177425911091 All based on what is in the dagster-snowflake package.
❤️ 2
s

saravan kumar

11/04/2022, 4:51 AM
Can you share the place where you use the resource in a job/ops?
j

James Hale

11/04/2022, 2:11 PM
@saravan kumar here's how we use
with_resources
to attach the IO manager.
from dagster import (
    AssetSelection,
    build_schedule_from_partitioned_job,
    define_asset_job,
    with_resources,
)

from tasks.resources.shared_config import SNOWFLAKE_BRONZE_CONFIG
from tasks.storage import snowflake_merge_json_io_manager
from tasks.utils import assets_with_group_name

from . import assets
from .partitions import cdrs_partition

all_assets = with_resources(
    assets_with_group_name(
        [
            assets.call_data_records,
        ],
        "bronze_atscall",
    ),
    resource_defs={
        # ...
        "atscall_snowflake_io_manager": snowflake_merge_json_io_manager.configured(
            SNOWFLAKE_BRONZE_CONFIG
        ),
    },
)
@saravan kumar the asset has an IO manager expectation like this:
@asset(
    name="call_data_records",
    key_prefix=PREFIX,
    required_resource_keys={"atscall"},
    io_manager_key="atscall_snowflake_io_manager",
    partitions_def=cdrs_partition,
)
def call_data_records(context):
❤️ 1
@saravan kumar then just calling the asset in a job.
❤️ 1
s

saravan kumar

11/04/2022, 2:35 PM
Thanks @James Hale