Giacomo Colombo
03/11/2024, 9:02 PM@asset
def my_asset_1():
some_logic()
def my_asset_2():
some_logic()
job_sample = define_asset_job(name="job_sample", selection=["my_asset_1","my_asset_2"]
"prepared_intended_confiscated_businesses"])
@sensor(job=job_sample)
def sensor_to_start_job(context):
env_var_value = os.environ.get("ENV_VAR", None)
if env_var_value == "trigger_job":
os.environ['ENV_VAR'] = 'stop'
yield RunRequest(run_key="unique_run_key",
run_config={})
Now if I change my asset to:
@asset(
required_resource_keys={'custom_year_resource'},
group_name="group_1",
partitions_def=set_partitions_files,
)
def ingested_conf_dest(context):
curr_year = context.resources.custom_year_resource['curr_year']
<http://context.log.info|context.log.info>(f'Current year >>>>>>>>>>>>>>> {curr_year}')
some_logic()
set_partitions_files = DynamicPartitionsDefinition(name="partition_according_to_sources")
What I need to specify in the sensor? where explicit the partition to consider? usually I select the partition to Materialize from Dagit UIScout
03/11/2024, 9:02 PMpartition_key
to the RunRequest
within your sensor. The partition_key
tells Dagster which partition of the asset to materialize. Here's how you can modify your sensor to work with partitioned assets: python from dagster import sensor, RunRequest, define_asset_job # Define your assets and partitions as you have done above # Define your asset job job_sample = define_asset_job(name="job_sample", selection=["ingested_conf_dest"]) @sensor(job=job_sample) def sensor_to_start_job(context): env_var_value = os.environ.get("ENV_VAR", None) if env_var_value == "trigger_job": os.environ['ENV_VAR'] = 'stop' # Determine the partition key you want to run partition_key = "the_partition_key_you_want_to_materialize" yield RunRequest( run_key="unique_run_key", run_config={}, # Include any run configuration if needed partition_key=partition_key, # Specify the partition key here )
In the above example, replace "the_partition_key_you_want_to_materialize"
with the actual partition key you want to materialize. The partition_key
should correspond to one of the partitions defined in your DynamicPartitionsDefinition
. When you trigger the sensor, it will now request a run for the specified partition of the asset. If you need to determine the partition key dynamically based on some logic, you can insert that logic into the sensor function before yielding the RunRequest
. Remember that the partition_key
must match one of the partition keys defined by your DynamicPartitionsDefinition
. If you're using dynamic partitions, you might need to implement additional logic to determine the latest or relevant partition key based on your data sources or other criteria.Giacomo Colombo
03/18/2024, 8:57 PMScout
03/18/2024, 8:57 PMRunRequest
class in Dagster is used within sensors to request the launch of a pipeline run. A sensor is a function that yields RunRequest
objects based on some external state, such as a new file in a directory, a change in a database, or a condition met in an environment variable, as in your example. Here are the key components of a `RunRequest`: - run_key (Optional[str]): A string key that uniquely identifies the run request. This is used to deduplicate run requests, ensuring that the same request doesn't result in multiple runs. If a sensor yields a RunRequest
with a run_key
that has already been used, Dagster will not start a new run for that request. - run_config (Optional[Dict]): A dictionary representing the run configuration for the pipeline. This includes configurations for resources, loggers, execution, and any other custom configuration that the pipeline expects. - tags (Optional[Dict[str, str]]): A dictionary of key-value pairs that will be attached to the run as tags. Tags can be used for filtering and organizing runs in the Dagster UI (Dagit). - partition_key (Optional[str]): The key of the partition to request a run for. This is used when dealing with partitioned assets or schedules. It tells Dagster which partition of the asset or schedule to materialize or execute. Here's an example of how a RunRequest
might look in a sensor: python from dagster import sensor, RunRequest @sensor(job=my_job) def my_sensor(context): # Logic to determine whether to trigger a run if should_trigger_run(): yield RunRequest( run_key="unique_run_key", run_config={ "solids": { "my_solid": { "config": { "param": "value" } } } }, tags={ "key": "value" }, partition_key="2021-01-01" )
In this example, should_trigger_run()
is a placeholder for whatever logic you need to determine if a run should be triggered. The run_config
is specific to the job being run, and the tags
are arbitrary metadata. The partition_key
is used if the job is associated with a partitioned asset or schedule. The RunRequest
is a powerful tool for programmatically controlling the execution of Dagster jobs, allowing for dynamic and conditional run generation based on external or internal factors.Giacomo Colombo
03/18/2024, 9:19 PM@asset(
description="My asset output",
group_name="Output_town",
partitions_def=set_partitions_years,
required_resource_keys={'partition_year_resource'},
)
def output_asset(context, input_asset_1):
year = int(context.partition_key)
<http://context.log.info|context.log.info>(f'Generating Output in year: {year}')
var_1 = MyOutputClass(year=year, logger=context.log)
df = var_1.create_town()
Scout
03/18/2024, 9:19 PMins
parameter within the @asset
decorator. This is similar to how you would define dependencies for @multi_asset
. The ins
parameter allows you to specify dependencies using AssetIn
objects, which can reference other assets by their keys. Here's an example of how you can define the dependency for output_asset
on input_asset_1
using the ins
parameter: python from dagster import asset, AssetIn @asset( description="My asset output", group_name="Output_town", partitions_def=set_partitions_years, required_resource_keys={'partition_year_resource'}, ins={"input_asset_1": AssetIn(asset_key="input_asset_1_key")} ) def output_asset(context, input_asset_1): year = int(context.partition_key) <http://context.log.info|context.log.info>(f'Generating Output in year: {year}') var_1 = MyOutputClass(year=year, logger=context.log) df = var_1.create_town() return df
In this example, input_asset_1_key
should be replaced with the actual asset key of the input_asset_1
asset. The asset key is a unique identifier for an asset within a Dagster repository and can be explicitly set when defining an asset or defaults to the asset's name if not set. If input_asset_1
is not already defined with an explicit asset key, you can add one like this: python from dagster import asset @asset( name="input_asset_1", asset_key="input_asset_1_key", # Explicitly setting the asset key # ... other parameters ... ) def input_asset_1(): # ... logic for input_asset_1 ... return some_data
By using the ins
parameter with AssetIn
, you can define the dependency relationship without having to import input_asset_1
directly in the file where output_asset
is defined. This can be particularly useful for managing dependencies in larger projects where assets are defined across multiple files or modules.