https://dagster.io/ logo
#ask-community
Title
# ask-community
d

Daniel Galea

12/19/2022, 3:57 PM
Hi I am trying to setup a multi-asset sensor but I am getting the following error:
"All assets must be partitioned and share the same partitions definition"
. However, all my assets contain partition definitions so I'm not sure why this happens. I have a hunch that it has to do with the AssetKey (see code below) but I'm not sure how I should solve it if that's the case. Has someone run into this issue before? I create my Assets and Graphs inside of another function so that I can re-use those functions and parameterize parts of the Asset and Graph. PartitionsDefinition:
partitions_definition = DailyPartitionsDefinition(
start_date="2022-12-01", timezone="Europe/Amsterdam"
)
Asset:
def create_asset(asset_name: str, partitions_definition: PartitionsDefinition):
@asset(
io_manager_key="s3_io_manager",
required_resource_keys={"s3_io_manager"},
name=asset_name,
partitions_def=partitions_definition
)
def customer_transactions():
rows = list()
fake = Faker()
for _ in range(0, 10000):
rows.append(
{
"name": fake.name(),
"item": fake.ean(length=13),
"color": fake.color(),
"credit_card": fake.credit_card_number(card_type="visa"),
}
)
return convert_to_json_lines_file(rows=rows)
return customer_transactions
Graph:
def ingestion_graph(asset_name: str, graph_name: str, partitions_definition: PartitionsDefinition):
@graph(name=graph_name, partitions_definition=partitions_definition)
def ingestion_pipeline():
create_asset(asset_name=asset_name, partitions_definition=partitions_definition)
return ingestion_pipeline
Job (used to materialize assets):
my_job1 = ingestion_graph(asset_name="test_asset", graph_name="my_graph", partitions_definition=partitions_definition).to_job(...)
my_job2 =  ingestion_graph(asset_name="test_asset2", graph_name="my_graph2", partitions_definition=partitions_definition).to_job(...)
Multi-Asset Sensor:
def generate_multi_asset_sensor(sensor_name: str, asset_keys: List[AssetKey], downstream_job: JobDefinition):
@multi_asset_sensor(
asset_keys=asset_keys,
job=downstream_job,
name=sensor_name,
default_status=DefaultSensorStatus.RUNNING
)
def trigger_daily_asset_if_both_upstream_partitions_materialized(context: MultiAssetSensorEvaluationContext):
run_requests = []
for (
partition,
materializations_by_asset,
) in context.latest_materialization_records_by_partition_and_asset().items():
if set(materializations_by_asset.keys()) == set(context.asset_keys):
run_requests.append(
downstream_job.run_request_for_partition(partition)
)
for asset_key, materialization in materializations_by_asset.items():
context.advance_cursor({asset_key: materialization})
return run_requests
return trigger_daily_asset_if_both_upstream_partitions_materialized
multi_sensor = generate_multi_asset_sensor(sensor_name="my_sensor", asset_keys=[AssetKey("test_asset"), AssetKey("test_asset2")], downstream_job=some_downstream_job)
Repository:
@repository
def my_repo():
return [my_job1, my_job2, multi_sensor]
Have I misunderstood something? I am partitioning my assets but I still get this error.
s

sandy

12/27/2022, 9:57 PM
@Daniel Galea are you still facing this issue? @claire might be able to figure out what's going on
c

claire

12/27/2022, 11:29 PM
Hi Daniel. The reason why you are seeing this error is because invoking assets within graphs will not build an asset job, so Dagster fails to recognize that an asset exists your jobs. In order to define an asset job, you should use the
define_asset_job
function. So I'd replace your
ingestion_graph
function with something like this:
Copy code
def ingestion_asset_and_job(
    asset_name: str, graph_name: str, partitions_definition: PartitionsDefinition
):
    return create_asset(
        asset_name=asset_name, partitions_definition=partitions_definition
    ), define_asset_job(
        graph_name, selection=AssetSelection.keys(asset_name), partitions_def=partitions_definition
    )
Then, just be sure to add all of the assets returned from
create_asset
to your repository and then your multi-asset sensor should work.
7 Views