https://dagster.io/ logo
Title
j

Jeff

01/13/2023, 10:08 PM
Hi, I’m having trouble using
materialize()
for testing assets. Details in 🧵
I’m getting this:
dagster._core.errors.DagsterUnknownResourceError: Unknown resource `pyspark`. Specify `pyspark` as a required resource on the compute / config function that accessed it.
but I don’t see a way to specify the required resource keys for
materialize()
. This works when I materialize it thorugh dagit, but I can’t get it to work with
materialize()
.
o

owen

01/13/2023, 10:52 PM
hi @Jeff! you can pass in a dictionary of resources into the
resources
argument of
materialize
. So I would expect
materialize(some_assets, resources={"pyspark": some_pyspark_resource})
to work for you
j

Jeff

01/13/2023, 10:53 PM
I did provide that, which is why I’m confused. I didn’t dive into the source code too much, but it seems like it’s expecting it under required resources?
o

owen

01/13/2023, 10:55 PM
oh I see what you mean -- how are you providing this resource in the default case (i.e. when you materialize through dagit)?
and which asset are you expecting to use the
pyspark
resource?
j

Jeff

01/13/2023, 10:57 PM
I’m providing it in the asset definition (with the resource definition and required resource keys) and it’s being used in my IO manager, which I suspect may the cause of the problem.
o

owen

01/13/2023, 10:59 PM
mind sharing a code snippet of the asset definition?
j

Jeff

01/13/2023, 11:01 PM
Let me check with compliance first.
o

owen

01/13/2023, 11:03 PM
sounds good, and feel free to fuzz out whatever you want (change the names around etc.) I'm also not interested in the code inside the asset, just the definition part itself
👍 1
j

Jeff

01/13/2023, 11:04 PM
resources = {"pyspark": pyspark_resource}

@asset(
    name="dummy_asset_upstream",
    metadata={"dagster_partition_cols": ["foo", "date"], "additional_partition_cols": ["test_col"]},
    partitions_def=foo_daily_partition,
    io_manager_def=mock_delta_io_manager,
    required_resource_keys={"pyspark"},
    resource_defs=resources
)
def dummy_asset_upstream():
    pass 

@asset(
    metadata={"dagster_partition_cols": ["foo", "date"], "additional_partition_cols": ["test_col"]},
    partitions_def=foo_daily_partition,
    io_manager_def=mock_delta_io_manager,
    ins={"dummy_asset_upstream": AssetIn("dummy_asset_upstream")},
    required_resource_keys={"pyspark"},
    resource_defs=resources
)
def dummy_asset_downstream(dummy_asset_upstream):
    pass


materialize([dummy_asset_upstream, dummy_asset_downstream], partition_key="abcd|2022-10-30", resources=resources)
o

owen

01/13/2023, 11:15 PM
hm I'm not able to replicate this issue (for reference, here's my modified version of the code:
from dagster import ResourceDefinition, asset, AssetIn, materialize, io_manager, IOManager

resources = {"pyspark": ResourceDefinition.hardcoded_resource(1111111111)}


@io_manager(required_resource_keys={"pyspark"})
def mock_delta_io_manager(_):
    class TestIOManager(IOManager):
        def handle_output(self, context, obj):
            print(context.resources.pyspark)

        def load_input(self, context):
            pass

    return TestIOManager()


@asset(
    name="dummy_asset_upstream",
    metadata={"dagster_partition_cols": ["foo", "date"], "additional_partition_cols": ["test_col"]},
    io_manager_def=mock_delta_io_manager,
    required_resource_keys={"pyspark"},
    resource_defs=resources,
)
def dummy_asset_upstream(context):
    print(context.resources.pyspark)


@asset(
    metadata={"dagster_partition_cols": ["foo", "date"], "additional_partition_cols": ["test_col"]},
    io_manager_def=mock_delta_io_manager,
    ins={"dummy_asset_upstream": AssetIn("dummy_asset_upstream")},
    required_resource_keys={"pyspark"},
    resource_defs=resources,
)
def dummy_asset_downstream(context, dummy_asset_upstream):
    print(context.resources.pyspark)


materialize(
    [dummy_asset_upstream, dummy_asset_downstream],
    # resources=resources,
)
Interestingly, it works with or without that
resources=resources
line commented out.
j

Jeff

01/13/2023, 11:15 PM
I think the problem may be that my io_manager uses the resource?
o

owen

01/13/2023, 11:15 PM
are you able to tell which part of your code your error is coming from? I.e. does the stack trace indicate that the IOManager is erroring when trying to access the resource or is the asset?
j

Jeff

01/13/2023, 11:16 PM
def load_input(self, context: InputContext) -> pyspark.sql.DataFrame:
        spark = context.resources.pyspark.spark_session
It’s from here in the IO manager, sorry I didn’t mention that before.
o

owen

01/13/2023, 11:17 PM
got it -- does the IOManager have
required_resource_keys={"pyspark"}
specified on the
@io_manager
decorator?
j

Jeff

01/13/2023, 11:18 PM
Ah, that is probably it, let me try that. I’m not sure why this works in dagit though haha.
o

owen

01/13/2023, 11:18 PM
yeah I'm pretty surprised by that too 🤯
j

Jeff

01/13/2023, 11:19 PM
Ok actually i did add it
class DeltaIOManager(IOManager):
    def __init__(self, s3_bucket: str):
        self.bucket = s3_bucket

    def handle_output(self, context: OutputContext, obj: Union[pyspark.sql.DataFrame, None]) -> None:
        pass

    def load_input(self, context: InputContext):
        spark = context.resources.pyspark.spark_session
        pass

@io_manager(
    config_schema={"s3_bucket": Field(StringSource)}, required_resource_keys={"pyspark"},
)
def delta_io_manager(init_context: InitResourceContext) -> DeltaIOManager:
    s3_bucket = init_context.resource_config["s3_bucket"]

    return DeltaIOManager(s3_bucket)
This is what I have
I can confirm your code works on my environment.
👍 1
o

owen

01/13/2023, 11:31 PM
hm I wonder what might be causing the issue then... I assume the
mock_delta_io_manager
passed into the
io_manager_def=mock_delta_io_manager
argument of the
@asset
decorators you shared is the same as the
delta_io_manager
above?
j

Jeff

01/13/2023, 11:32 PM
Hmm, I messed around with some stuff, and I think it might have to do with some mock calls I have. I have to go for now, but I’ll look into it more myself next week and let you know if I can’t figure it out myself. I appreciate your help!
😒aluting_face: 1
o

owen

01/13/2023, 11:33 PM
sounds good!