https://dagster.io/ logo
#ask-community
Title
# ask-community
c

Charles Lariviere

08/08/2022, 2:27 PM
Hey 👋 I’m trying to write a test for a partitioned asset and running into errors fetching the partition key from the context. When attempting to run the following test, I get an
AttributeError
for
_step_execution_context
(see thread for stack trace):
Copy code
@asset
def my_asset(context):
   time_window = context.asset_partitions_time_window_for_output()
   ...

def test_my_asset():
  with build_op_context(partition_key="2022-08-03") as context:
    result = my_asset(context)
    ...
Since
build_op_context()
allows defining a
partition_key
, I would have expected my asset to work in the context of this simple test. Though, the error makes it seem like this could only run within a “full” execution. Am I doing something wrong, or is this not yet supported?
Copy code
self = <dagster._core.execution.context.invocation.BoundSolidExecutionContext object at 0x11ec3c580>, output_name = 'result'

    def asset_partitions_time_window_for_output(self, output_name: str = "result") -> TimeWindow:
        """The time window for the partitions of the output asset.
    
        Raises an error if either of the following are true:
        - The output asset has no partitioning.
        - The output asset is not partitioned with a TimeWindowPartitionsDefinition.
        """
>       return self._step_execution_context.asset_partitions_time_window_for_output(output_name)
E       AttributeError: _step_execution_context
This seems similar to another issue I had previously with building dagster context in tests that was resolved in #8774.
o

owen

08/08/2022, 6:24 PM
hi @Charles Lariviere! thanks for the report -- I dug into this a little and it looks like this is not currently supported. A quick explanation is that the computation behind that
asset_partitions_time_window_for_output
call requires looking up information that only exists on the JobDefinition object at the moment (and no JobDefinition exists when invoking in the test). Another option which would avoid this issue would be to use
materialize
, i.e.
Copy code
from dagster import materialize

@asset
def my_asset(context):
   time_window = context.asset_partitions_time_window_for_output()
   ...

def test_my_asset():
   result = materialize([my_asset], partition_key="2022-08-03")
c

Charles Lariviere

08/08/2022, 7:11 PM
Thanks for looking into it @owen! 🙏 I was trying to not use
materialize
for at least these two reasons: 1.
my_asset
uses a database resource wrapped in a context manager that closes the connection upon release. To test my asset’s output, I need that same connection (i.e. I’m using Snowpark with an in-memory dataframe, but it’s no longer available when the connection is released). 2. Ability to easily pass multiple mocked inputs. I think this is a bit harder to do through
materialize
, vs. directly to the asset function. #2 seems still possible with
materialize
, but I will try to see if there’s a good workaround for #1!
8 Views