https://dagster.io/ logo
#ask-community
Title
# ask-community
r

Riccardo Tesselli

02/08/2023, 4:29 PM
I would like to load assets that shares same resource keys with different configuration, like:
Copy code
return [
        with_resources(
            some_assets1,
            resource_defs=config1
        ),
        with_resources(
            some_assets2,
            resource_defs=config2
        )
    ]
both
some_assets1
and
some_assets2
share a resource key like
my_resource
, in
config1
it’s something like:
Copy code
{
  "my_resource": some_stuff.configured({some_data:1})
}
and in config2 it’s like:
Copy code
{
  "my_resource": some_stuff.configured({some_data:2})
}
but then I get:
Copy code
Conflicting versions of resource with key 'my_resource' were provided to different assets. When constructing a job, all resource definitions provided to assets must match by reference equality for a given key.
I’ve seen here that it’s suggested to have different resource keys, but that sounds an overcomplication to me when we have something like
with_resources
, also here in the docs it’s stated:
Copy code
If you need to apply different resources to different assets, use legacy @repository and use with_resources as before.
Currently I’m running Dagster 1.1.13 thanks!
here is a minimal full example with the issue:
Copy code
from dagster import asset, repository, io_manager, IOManager, OutputContext, InputContext, Any, InitResourceContext, \
    resource, with_resources


@asset
def some_asset1():
    return 1


@asset
def some_asset2():
    return 2


class MyIOManager(IOManager):
    def __init__(self, res):
        self.resource = res

    def handle_output(self, context: OutputContext, obj: Any) -> None:
        ...

    def load_input(self, context: InputContext) -> Any:
        ...


@io_manager(required_resource_keys={'my_resource'})
def my_io_manager(init_context: InitResourceContext):
    return MyIOManager(init_context.resources.my_resource)


class MyResource:
    def __init__(self, arg):
        self.arg = arg


@resource(config_schema={'some_arg': int})
def my_resource(init_context):
    return MyResource(init_context.resource_config['some_arg'])


@repository
def my_repo():
    defs = {
        "my_resource": my_resource.configured({'some_arg': 1}),
        "io_manager": my_io_manager
    }
    defs2 = {
        "my_resource": my_resource.configured({'some_arg': 2}),
        "io_manager": my_io_manager
    }
    return [
        with_resources(
            [some_asset1],
            resource_defs=defs
        ),
        with_resources(
            [some_asset2],
            resource_defs=defs2
        )
    ]
s

Sarit Tiras

03/15/2023, 12:58 AM
Copy code
metadata_ingestion_assets = load_assets_from_modules(
    [metadata_ingestion],
    group_name="METADATA_INGESTION",
    key_prefix="METADATA_INGESTION"
)

metadata_ingestion_job = define_asset_job(
    name="INGEST_METADATA",
    selection=AssetSelection.groups("METADATA_INGESTION")
)

metadata_ingestion_schedule = ScheduleDefinition(
    job=metadata_ingestion_job,
    cron_schedule="@daily"
)

dynamic_tasks_assets = load_assets_from_modules(
    [dynamic_tasks],
    group_name="DYNAMIC_PROCESSING",
    key_prefix="DYNAMIC_PROCESSING"
)

dynamic_tasks_job = define_asset_job(
    name="DYNAMIC_TASK_GENERATION",
    selection=AssetSelection.groups("DYNAMIC_PROCESSING")
)

@repository
def metadata_ingestion_pipeline():
    return [
        metadata_ingestion_job,
        metadata_ingestion_schedule,
        dynamic_tasks_job,
        with_resources(
            [*metadata_ingestion_assets, *dynamic_tasks_assets],
            resource_defs={
                    "scraper_db": mysql_scraper.scraper_db.configured(
                        {
                            "host": {"env": "SCRAPER_DB_HOST"},
                            "username": {"env": "SCRAPER_DB_USERNAME"},
                            "password": {"env": "SCRAPER_DB_PASSWORD"},
                            "port": "3306"
                        }
                    )
            },
            resource_config_by_key={
                "scraper_db": {
                    "config": "scraper_db"
                }
            }
        )
    ]
This is from one of my test snippets. 2 jobs metadata_ingestion_job and dynamic_tasks_job share the same resource scraper_db via the resource_config_by_key option. hope this helps you.
👀 1
2 Views