Riccardo Tesselli
02/08/2023, 4:29 PMreturn [
with_resources(
some_assets1,
resource_defs=config1
),
with_resources(
some_assets2,
resource_defs=config2
)
]
both some_assets1
and some_assets2
share a resource key like my_resource
, in config1
it’s something like:
{
"my_resource": some_stuff.configured({some_data:1})
}
and in config2 it’s like:
{
"my_resource": some_stuff.configured({some_data:2})
}
but then I get:
Conflicting versions of resource with key 'my_resource' were provided to different assets. When constructing a job, all resource definitions provided to assets must match by reference equality for a given key.
I’ve seen here that it’s suggested to have different resource keys, but that sounds an overcomplication to me when we have something like with_resources
, also here in the docs it’s stated:
If you need to apply different resources to different assets, use legacy @repository and use with_resources as before.
Currently I’m running Dagster 1.1.13
thanks!Riccardo Tesselli
02/09/2023, 9:43 AMfrom dagster import asset, repository, io_manager, IOManager, OutputContext, InputContext, Any, InitResourceContext, \
resource, with_resources
@asset
def some_asset1():
return 1
@asset
def some_asset2():
return 2
class MyIOManager(IOManager):
def __init__(self, res):
self.resource = res
def handle_output(self, context: OutputContext, obj: Any) -> None:
...
def load_input(self, context: InputContext) -> Any:
...
@io_manager(required_resource_keys={'my_resource'})
def my_io_manager(init_context: InitResourceContext):
return MyIOManager(init_context.resources.my_resource)
class MyResource:
def __init__(self, arg):
self.arg = arg
@resource(config_schema={'some_arg': int})
def my_resource(init_context):
return MyResource(init_context.resource_config['some_arg'])
@repository
def my_repo():
defs = {
"my_resource": my_resource.configured({'some_arg': 1}),
"io_manager": my_io_manager
}
defs2 = {
"my_resource": my_resource.configured({'some_arg': 2}),
"io_manager": my_io_manager
}
return [
with_resources(
[some_asset1],
resource_defs=defs
),
with_resources(
[some_asset2],
resource_defs=defs2
)
]
Sarit Tiras
03/15/2023, 12:58 AMmetadata_ingestion_assets = load_assets_from_modules(
[metadata_ingestion],
group_name="METADATA_INGESTION",
key_prefix="METADATA_INGESTION"
)
metadata_ingestion_job = define_asset_job(
name="INGEST_METADATA",
selection=AssetSelection.groups("METADATA_INGESTION")
)
metadata_ingestion_schedule = ScheduleDefinition(
job=metadata_ingestion_job,
cron_schedule="@daily"
)
dynamic_tasks_assets = load_assets_from_modules(
[dynamic_tasks],
group_name="DYNAMIC_PROCESSING",
key_prefix="DYNAMIC_PROCESSING"
)
dynamic_tasks_job = define_asset_job(
name="DYNAMIC_TASK_GENERATION",
selection=AssetSelection.groups("DYNAMIC_PROCESSING")
)
@repository
def metadata_ingestion_pipeline():
return [
metadata_ingestion_job,
metadata_ingestion_schedule,
dynamic_tasks_job,
with_resources(
[*metadata_ingestion_assets, *dynamic_tasks_assets],
resource_defs={
"scraper_db": mysql_scraper.scraper_db.configured(
{
"host": {"env": "SCRAPER_DB_HOST"},
"username": {"env": "SCRAPER_DB_USERNAME"},
"password": {"env": "SCRAPER_DB_PASSWORD"},
"port": "3306"
}
)
},
resource_config_by_key={
"scraper_db": {
"config": "scraper_db"
}
}
)
]
Sarit Tiras
03/15/2023, 12:59 AM