https://dagster.io/ logo
#ask-ai
Title
# ask-ai
p

Paul Herzog

02/24/2024, 7:15 PM
@Scout Question for best practise in structuring an Asset. Lets say i have an asset group for a specific data source (lets call the data source "accuranker"). for this source i defined a resource which takes many different api keys and endpoints. for each project i have a different endpoint id. So naturally i use for each project a different asset. the response is always the same (list). What is now best practise after genereting n-amount of response lists? Send them all to one and only "list to dataframe" asset, which takes n-amount of Inputs,which are defined as AssetIns? Or should i create one asset for each api request? Or is there any other solution / best practise possible which i currently do not think of?
s

Scout

02/24/2024, 7:15 PM
When structuring assets in Dagster, especially when dealing with multiple data sources or projects that require similar processing steps, there are a few best practices you can consider. The documentation provides guidance on how to handle such scenarios, and I'll outline a couple of approaches that you might find useful for your "accuranker" data source. 1. Use of Partitioned Assets: If your data source "accuranker" has multiple projects with different API keys and endpoints, but the structure of the response is the same, you could consider using partitioned assets. Partitioned assets allow you to represent a collection of objects that are all computed the same way, but with different data. This is particularly useful when you have identical structures but different data for each partition. By default, Dagster assumes that if asset A and B have the same partitioning, and B depends on A, then partition X of B is derived from partition X of A. This can be customized using a non-default
PartitionMapping
. 2. Separate Assets with Shared Logic: Another approach is to create separate assets for each API request, but abstract the shared logic into a common function. This way, you can maintain the individuality of each asset while avoiding code duplication. For example, you could have a function that converts a list to a DataFrame, and each asset would call this function with its own data. 3. Multi-Asset with Dynamic Outputs: If the number of projects or endpoints is dynamic and you want to avoid defining a fixed number of inputs, you could use a
multi_asset
with dynamic outputs. This allows you to yield multiple outputs from a single asset function, where each output corresponds to a different project or endpoint. The documentation provides an example of using
multi_asset
with dynamic keys generated from a source, which could be adapted to your use case. 4. Combining Lists into a Single DataFrame Asset: If you prefer to have a single asset that combines all the lists into one DataFrame, you could create an asset that takes multiple inputs, each defined as
AssetIn
. This asset would then be responsible for combining the lists into a single DataFrame. However, this approach might become less manageable as the number of projects grows, and it could introduce complexity in terms of asset dependencies and execution. In summary, the best practice would depend on the specifics of your use case, such as the number of projects, the dynamism of the endpoints, and how you want to manage dependencies between assets. Partitioned assets offer a condensed and performant representation for collections of data with identical structures, while separate assets with shared logic provide individuality with reduced code duplication. Multi-
assets with dynamic outputs offer flexibility for a varying number of outputs, and a single DataFrame asset can centralize the combination process but may become complex to manage. It's important to consider the trade-offs of each approach and choose the one that best fits your requirements for maintainability, scalability, and clarity in your Dagster project.
p

Paul Herzog

02/24/2024, 7:16 PM
@Scout can you give me an best practise example of an asset with dynamic outputs as well as the dataframe asset which takes n-amount of inputs?
s

Scout

02/24/2024, 7:16 PM
The documentation does not provide a direct example of an asset with dynamic outputs or an asset that takes an arbitrary number of inputs (n-amount). However, I can guide you through the concepts based on the documentation's guidance on similar topics. For dynamic outputs, the documentation discusses the use of
DynamicOutput
within ops, which can be used to dynamically generate a set of outputs during runtime. While this is not directly related to assets, the concept can be applied when you want to create multiple assets based on dynamic conditions. Here's a conceptual example based on ops that you might adapt to assets:
Copy code
python from dagster import op, DynamicOut, DynamicOutput  @op(out=DynamicOut()) def generate_dynamic_outputs(context):     # Imagine this list is dynamically generated based on some condition     dynamic_list = ["project1", "project2", "project3"]     for item in dynamic_list:         yield DynamicOutput(value=item, mapping_key=item)
For an asset that takes n-amount of inputs, you would typically define each input explicitly. However, if you have a dynamic number of inputs, you might consider using a pattern where you pass a list or dictionary of inputs to a single
AssetIn
. Here's a conceptual example:
Copy code
python from dagster import asset, AssetIn import pandas as pd  @asset(ins={"input_data": AssetIn()}) def combine_dataframes_to_single_dataframe(input_data):     # Assuming input_data is a dictionary of DataFrames     combined_dataframe = pd.concat(input_data.values())     return combined_dataframe
In this example,
input_data
is expected to be a dictionary where the keys are the identifiers for each project's DataFrame, and the values are the actual DataFrames. You would need to ensure that the dictionary is constructed before passing it to the asset. Please note that these examples are conceptual and based on the principles discussed in the documentation. Dagster's asset model may require more specific handling, especially when dealing with dynamic inputs and outputs. The actual implementation may vary and would need to be adapted to fit the capabilities and constraints of the Dagster framework. If dynamic behavior for assets is a requirement for your use case, you might need to explore more advanced patterns or await future features that could support such dynamic asset generation and handling directly.
p

Paul Herzog

02/24/2024, 7:25 PM
@Scout I now have this result - takes two api requests as input, transforms it to dataframe and then returns two seperate dataframes. do i need to go multi-asset with two outs here? @asset() def google_for_job_data_to_df(context: AssetExecutionContext, jt_api_google_for_jobs: List, hj_api_google_for_jobs: List) -> pd.DataFrame: df_jt = accuranker_response_to_df(jt_api_google_for_jobs) df_hj = accuranker_response_to_df(hj_api_google_for_jobs) # add context context.add_output_metadata( metadata={ "jt_num_records": len(df_jt), "jt_preview": MetadataValue.md(df_jt.head().to_markdown()), "hj_num_records": len(df_hj), "hj_preview": MetadataValue.md(df_hj.head().to_markdown()) } ) return df_jt, df_hj
s

Scout

02/24/2024, 7:25 PM
Based on the documentation, if you have an asset function that needs to return two separate dataframes, you should indeed use multiple outputs. In Dagster, you can define an asset with multiple outputs by using the
Out
class to specify each output. Here's how you can modify your asset to return two separate dataframes as distinct outputs:
Copy code
python from dagster import asset, Out import pandas as pd from dagster import MetadataValue  @asset(     outs={         "df_jt": Out(pd.DataFrame),         "df_hj": Out(pd.DataFrame),     } ) def google_for_job_data_to_df(context, jt_api_google_for_jobs: List, hj_api_google_for_jobs: List):     df_jt = accuranker_response_to_df(jt_api_google_for_jobs)     df_hj = accuranker_response_to_df(hj_api_google_for_jobs)          # add context     context.add_output_metadata(         "df_jt",         metadata={             "num_records": len(df_jt),             "preview": MetadataValue.md(df_jt.head().to_markdown()),         },     )     context.add_output_metadata(         "df_hj",         metadata={             "num_records": len(df_hj),             "preview": MetadataValue.md(df_hj.head().to_markdown()),         },     )          yield Output(df_jt, "df_jt")     yield Output(df_hj, "df_hj")
In this example,
Out
is used to define two outputs,
df_jt
and
df_hj
, each of which is a
pd.DataFrame
. The
context.add_output_metadata
method is used to add metadata to each output separately. The
yield
statement is used to return the dataframes as outputs from the asset. Please note that the actual implementation may vary depending on the version of Dagster you are using and the specific requirements of your project. Always refer to the latest Dagster documentation for the most accurate and up-to-date information on defining assets with multiple outputs.
p

Paul Herzog

02/24/2024, 7:27 PM
@Scout TypeError: asset() got an unexpected keyword argument 'outs'
s

Scout

02/24/2024, 7:27 PM
I apologize for the confusion. The
outs
argument is not available for the
@asset
decorator in Dagster. Instead, you can use the
@multi_asset
decorator to define multiple outputs for an asset. Here's an example of how you can use
@multi_asset
to return two separate dataframes:
Copy code
python from dagster import multi_asset, AssetOut import pandas as pd from dagster import MetadataValue  @multi_asset(     outs={         "df_jt": AssetOut(),         "df_hj": AssetOut(),     } ) def google_for_job_data_to_df(context, jt_api_google_for_jobs: List, hj_api_google_for_jobs: List):     df_jt = accuranker_response_to_df(jt_api_google_for_jobs)     df_hj = accuranker_response_to_df(hj_api_google_for_jobs)          # add context     context.add_output_metadata(         "df_jt",         metadata={             "num_records": len(df_jt),             "preview": MetadataValue.md(df_jt.head().to_markdown()),         },     )     context.add_output_metadata(         "df_hj",         metadata={             "num_records": len(df_hj),             "preview": MetadataValue.md(df_hj.head().to_markdown()),         },     )          return {"df_jt": df_jt, "df_hj": df_hj}
In this example,
@multi_asset
is used to define an asset with multiple outputs. The
outs
argument is a dictionary where the keys are the names of the outputs and the values are instances of
AssetOut
. The function returns a dictionary mapping the output names to the corresponding dataframes. Please ensure that you are using a version of Dagster that supports
@multi_asset
. If you encounter any issues or if
@multi_asset
is not available in your version, please refer to the Dagster documentation or consider updating to a newer version of Dagster that includes this feature.
p

Paul Herzog

02/24/2024, 8:54 PM
@Scout Can i just use two different assets for api fetching and then provide both assets as ins dictionary for a downstream asset which converts the list to dataframe and then provide two outs dataframes?
s

Scout

02/24/2024, 8:54 PM
Yes, you can certainly structure your Dagster assets in that way. You would have separate assets for fetching data from the API, and then a downstream asset that takes these lists as inputs and converts them to dataframes. Here's a conceptual example of how you might structure this:
Copy code
python from dagster import asset, AssetIn import pandas as pd  # Assets for fetching data from the API @asset def fetch_data_from_api_1():     # Your code to fetch data from API 1     data_list_1 = ...     return data_list_1  @asset def fetch_data_from_api_2():     # Your code to fetch data from API 2     data_list_2 = ...     return data_list_2  # Downstream asset that takes both lists as inputs and converts them to dataframes @asset(ins={"api_1_data": AssetIn(), "api_2_data": AssetIn()}) def convert_lists_to_dataframes(api_1_data, api_2_data):     df_1 = pd.DataFrame(api_1_data)     df_2 = pd.DataFrame(api_2_data)     return df_1, df_2
In this example,
fetch_data_from_api_1
and
fetch_data_from_api_2
are assets that fetch data from two different APIs and return the data as lists. The
convert_lists_to_dataframes
asset takes these lists as inputs and converts them into two separate Pandas dataframes. However, please note that the
@asset
decorator does not directly support multiple outputs. If you need to create two separate assets from the
convert_lists_to_dataframes
function, you would need to define two separate assets, each depending on the API fetching assets. Here's how you could do that:
Copy code
python from dagster import asset, AssetIn import pandas as pd  # Assets for fetching data from the API @asset def fetch_data_from_api_1():     # Your code to fetch data from API 1     data_list_1 = ...     return data_list_1  @asset def fetch_data_from_api_2():     # Your code to fetch data from API 2     data_list_2 = ...     return data_list_2  # Downstream assets that take the lists as inputs and convert them to dataframes @asset(ins={"api_1_data": AssetIn()}) def convert_list_to_dataframe_1(api_1_data):     df_1 = pd.DataFrame(api_1_data)     return df_1  @asset(ins={"api_2_data": AssetIn()}) def convert_list_to_dataframe_2(api_2_data):     df_2 = pd.DataFrame(api_2_data)     return df_2
In this revised example,
convert_list_to_dataframe_1
and
convert_list_to_dataframe_2
are separate assets that each take one list as input and convert it to a dataframe. This way, you maintain a clear one-to-one mapping between inputs and outputs for each asset.
p

Paul Herzog

02/24/2024, 8:58 PM
@Scout i only want to have one "list to dataframe" asset in GUI for better structure, which also passes two dataframes downstream again. how to do this?
s

Scout

02/24/2024, 8:58 PM
To achieve a single asset in the Dagster UI that takes multiple lists as inputs and produces multiple dataframes as outputs, you can use the
@multi_asset
decorator. This decorator allows you to define an asset with multiple inputs and outputs. Here's an example of how you might define such an asset:
Copy code
python from dagster import multi_asset, AssetIn, AssetOut import pandas as pd  # Define a multi_asset with two inputs and two outputs @multi_asset(     ins={         "api_1_data": AssetIn(description="Data list from API 1"),         "api_2_data": AssetIn(description="Data list from API 2"),     },     outs={         "df_1": AssetOut(description="Dataframe for API 1 data"),         "df_2": AssetOut(description="Dataframe for API 2 data"),     } ) def convert_lists_to_dataframes(api_1_data, api_2_data):     df_1 = pd.DataFrame(api_1_data)     df_2 = pd.DataFrame(api_2_data)     return {"df_1": df_1, "df_2": df_2}
In this example,
convert_lists_to_dataframes
is defined as a
multi_asset
with two inputs (
api_1_data
and
api_2_data
) and two outputs (
df_1
and
df_2
). When you run this asset, it will appear as a single asset in the Dagster UI, but it will internally manage the two separate dataframes. This asset will take the lists provided by the upstream assets (which fetch the data from the APIs) and convert each list into a separate dataframe. The resulting dataframes are then returned as a dictionary, where the keys correspond to the names of the outputs defined in the
outs
parameter of the
@multi_asset
decorator. By using
@multi_asset
, you can keep your UI clean and structured, with a single asset representing the transformation from lists to dataframes, while still producing multiple outputs that can be consumed by downstream assets.
p

Paul Herzog

02/24/2024, 9:06 PM
@Scout i use this now: @multi_asset(ins= { "jt_api_all_ranking_data": AssetIn(key=AssetKey("jt_api_all_ranking_data")), "hj_api_all_ranking_data": AssetIn(key=AssetKey("hj_api_all_ranking_data")) }, outs= { "jt_all_ranking_data_df": AssetOut(description="Accuranker: All Ranking Data for Job Traveler"), "hj_all_ranking_data_df": AssetOut(description="Accuranker: All Ranking Data for Hello Jobs") } ) def all_ranking_data_to_df(context: AssetExecutionContext, jt_api_all_ranking_data: List, hj_api_all_ranking_data: List) -> pd.DataFrame: df = accuranker_response_to_df(jt_api_all_ranking_data) df2 = accuranker_response_to_df(hj_api_all_ranking_data) # add context context.add_output_metadata( metadata={ "num_records": len(df), "preview": MetadataValue.md(df.head().to_markdown()) } ) return { "jt_all_ranking_data_df": df, "hj_all_ranking_data_df": df2 } i get this errordagster. core.errors.DagsterInvariantViolationError Expected Tuple annotation for multiple outputs, but received non-tuple annotation.
s

Scout

02/24/2024, 9:06 PM
The error you're encountering suggests that Dagster is expecting a tuple return type annotation for multiple outputs, but it's receiving a non-tuple annotation instead. This is likely because the return type hint in your function signature is
pd.DataFrame
, which indicates a single output, while you are actually returning a dictionary with multiple outputs. To fix this error, you should remove the return type hint from the function signature since you're using the
@multi_asset
decorator, which already specifies the outputs in the
outs
parameter. Here's the corrected version of your asset definition:
Copy code
python from dagster import multi_asset, AssetIn, AssetOut, AssetExecutionContext, AssetKey from dagster import MetadataValue import pandas as pd  @multi_asset(     ins={         "jt_api_all_ranking_data": AssetIn(key=AssetKey("jt_api_all_ranking_data")),         "hj_api_all_ranking_data": AssetIn(key=AssetKey("hj_api_all_ranking_data"))     },     outs={         "jt_all_ranking_data_df": AssetOut(description="Accuranker: All Ranking Data for Job Traveler"),         "hj_all_ranking_data_df": AssetOut(description="Accuranker: All Ranking Data for Hello Jobs")     } ) def all_ranking_data_to_df(context: AssetExecutionContext,                            jt_api_all_ranking_data: List,                            hj_api_all_ranking_data: List):     df = accuranker_response_to_df(jt_api_all_ranking_data)     df2 = accuranker_response_to_df(hj_api_all_ranking_data)          # add context     context.add_output_metadata(         "jt_all_ranking_data_df",         metadata={             "num_records": len(df),             "preview": MetadataValue.md(df.head().to_markdown())         }     )     context.add_output_metadata(         "hj_all_ranking_data_df",         metadata={             "num_records": len(df2),             "preview": MetadataValue.md(df2.head().to_markdown())         }     )          return {         "jt_all_ranking_data_df": df,         "hj_all_ranking_data_df": df2     }
In this corrected version, I've removed the
-> pd.DataFrame
return type annotation from the function signature. Additionally, I've updated the
context.add_output_metadata
calls to specify the output name as the first argument, which is necessary when adding metadata to specific outputs in a multi-output asset. With these changes, your asset should correctly return multiple outputs without encountering the type hint error.
p

Paul Herzog

02/24/2024, 9:13 PM
how do i load this hiere
Copy code
adata={
            "num_records": len(df2),
            "preview": MetadataValue.md(df2.head().to_markdown())
        }
    )
in a downstream asset? @Scout
s

Scout

02/24/2024, 9:13 PM
In Dagster, metadata attached to an asset's materialization is primarily for informational purposes and is displayed in the Dagit UI. It is not intended to be passed directly to downstream assets as data. Instead, the data itself (e.g., the dataframe) should be passed between assets, and each asset can compute its own metadata if needed. If you want to use the metadata (like the number of records or a preview of the dataframe) in a downstream asset, you would typically recompute it within the downstream asset based on the data passed to it. Here's an example of how you might do this:
Copy code
python from dagster import asset, AssetIn import pandas as pd from dagster import MetadataValue  # Upstream asset that produces a dataframe and records metadata @asset def upstream_asset():     df = pd.DataFrame(...)  # Replace with actual data fetching logic     metadata = {         "num_records": len(df),         "preview": MetadataValue.md(df.head().to_markdown())     }     # The metadata is recorded here but not passed to downstream assets     return df  # Downstream asset that consumes the dataframe and computes its own metadata @asset(ins={"input_df": AssetIn()}) def downstream_asset(input_df: pd.DataFrame):     # Perform some operations on the dataframe     transformed_df = input_df  # Replace with actual transformation logic          # Compute metadata again if needed     metadata = {         "num_records": len(transformed_df),         "preview": MetadataValue.md(transformed_df.head().to_markdown())     }     # You can log the metadata or use it within this asset as needed     return transformed_df
In this example,
upstream_asset
computes and records metadata, but only the dataframe (
df
) is returned. The
downstream_asset
then takes this dataframe as input and computes its own metadata based on the transformed dataframe. If you truly need to pass metadata between assets, you would have to include it in the data structure that you pass along. However, this is not a common pattern in Dagster, as metadata is typically not part of the computation graph's data flow. Instead, metadata is used to annotate materializations for observability purposes. If you have a specific use case that requires passing metadata between assets, you might need to reconsider your design or include the necessary information within the data payloads that your assets produce and consume.
p

Paul Herzog

02/24/2024, 9:14 PM
@Scout sorry, i meant this here:
Copy code
return {
        "jt_all_ranking_data_df": df,
        "hj_all_ranking_data_df": df2
    }
how do i load this in downstream asset?
s

Scout

02/24/2024, 9:14 PM
To load the outputs from a
multi_asset
in downstream assets, you would define the downstream assets with
AssetIn
annotations that reference the keys of the outputs you want to consume. Here's an example of how you can define downstream assets that depend on the outputs from your `multi_asset`:
Copy code
python from dagster import asset, AssetIn  # Assume your multi_asset is defined as follows: # @multi_asset(...) # def all_ranking_data_to_df(...): #     ... #     return { #         "jt_all_ranking_data_df": df, #         "hj_all_ranking_data_df": df2 #     }  # Downstream asset that consumes the "jt_all_ranking_data_df" dataframe @asset(ins={"jt_df": AssetIn()}) def process_jt_data(jt_df):     # Perform operations on jt_df     processed_jt_df = jt_df  # Replace with actual processing logic     return processed_jt_df  # Downstream asset that consumes the "hj_all_ranking_data_df" dataframe @asset(ins={"hj_df": AssetIn()}) def process_hj_data(hj_df):     # Perform operations on hj_df     processed_hj_df = hj_df  # Replace with actual processing logic     return processed_hj_df
In this example,
process_jt_data
and
process_hj_data
are downstream assets that take the dataframes produced by
all_ranking_data_to_df
as inputs. The
AssetIn
annotations specify the dependency on the outputs of the
multi_asset
. The keys used in the
AssetIn
annotations (
jt_df
and
hj_df
) should match the keys specified in the `multi_asset`'s
outs
dictionary. When you run your Dagster pipeline, the framework will automatically pass the outputs from
all_ranking_data_to_df
to the corresponding downstream assets based on these dependencies.
p

Paul Herzog

02/24/2024, 9:15 PM
can one asset consume both AssetIns? @Scout
s

Scout

02/24/2024, 9:15 PM
Yes, a single asset can consume multiple
AssetIn
dependencies. You can define an asset that takes multiple upstream assets as inputs by specifying each one in the
ins
parameter of the
@asset
decorator. Here's an example of how you can define an asset that depends on both
jt_all_ranking_data_df
and `hj_all_ranking_data_df`:
Copy code
python from dagster import asset, AssetIn  # Downstream asset that consumes both "jt_all_ranking_data_df" and "hj_all_ranking_data_df" dataframes @asset(ins={     "jt_df": AssetIn(description="Dataframe for Job Traveler data"),     "hj_df": AssetIn(description="Dataframe for Hello Jobs data") }) def process_both_dataframes(jt_df, hj_df):     # Perform operations on both jt_df and hj_df     # For example, you might want to merge them, compare them, etc.     # Replace the following lines with your actual processing logic     processed_jt_df = jt_df     processed_hj_df = hj_df      # Return a result or results based on your processing     # For example, you could return a tuple, a dictionary, or a single combined dataframe     return processed_jt_df, processed_hj_df
In this example,
process_both_dataframes
is an asset that takes two dataframes as input: one from
jt_all_ranking_data_df
and another from
hj_all_ranking_data_df
. The
ins
parameter of the
@asset
decorator is used to define these inputs, and the function parameters
jt_df
and
hj_df
correspond to these inputs. When you run this asset within a Dagster pipeline, it will automatically consume the outputs from the upstream assets that produce
jt_all_ranking_data_df
and
hj_all_ranking_data_df
, and you can perform any necessary processing on these inputs within the body of the asset function.
p

Paul Herzog

02/24/2024, 9:21 PM
this is upstream: @asset(ins= { "jt_api_google_for_jobs": AssetIn(key=AssetKey("jt_api_google_for_jobs")), "hj_api_google_for_jobs": AssetIn(key=AssetKey("hj_api_google_for_jobs")) }) def google_for_job_data_to_df(context: AssetExecutionContext, jt_api_google_for_jobs: List, hj_api_google_for_jobs: List): df = accuranker_response_to_df(jt_api_google_for_jobs) df2 = accuranker_response_to_df(hj_api_google_for_jobs) # add context context.add_output_metadata( metadata={ "num_records": len(df), "preview": MetadataValue.md(df.head().to_markdown()) } ) return { "jt_google_for_jobs_data_df": df, "hj_google_for_jobs_data_df": df2 } this is downstream: @asset(ins= { "jt_google_for_jobs_data_df": AssetIn(), "hj_google_for_jobs_data_df": AssetIn() }) def google_for_jobs_data_to_gcs(context: AssetExecutionContext, jt_google_for_jobs_data_df: pd.DataFrame, hj_google_for_jobs_data_df: pd.DataFrame, google_cloud_storage: GCSResource) -> tuple: client = google_cloud_storage.get_client() bucket_name = f"{project_id}-accuranker-orchestration-test" blob_name = f"dagster/accuranker/google_for_jobs_data/{today_date}-accuranker.csv" print(jt_google_for_jobs_data_df.head()) print(hj_google_for_jobs_data_df.head()) @Scout I get this error: dagster._core.errors.DagsterInvalidDefinitionError: Input asset '["hj_google_for_jobs_data_df"]' for asset '["google_for_jobs_data_to_gcs"]' is not produced by any of the provided asset ops and is not one of the provided sources
s

Scout

02/24/2024, 9:21 PM
The error message you're encountering indicates that Dagster cannot find an asset that produces the input
hj_google_for_jobs_data_df
for the
google_for_jobs_data_to_gcs
asset. This usually means there's a mismatch between the asset keys or there's a missing asset definition that should produce the specified output. To resolve this issue, ensure that: 1. The asset keys you're using to define inputs in
google_for_jobs_data_to_gcs
match the keys of the outputs produced by
google_for_job_data_to_df
. 2. The
google_for_job_data_to_df
asset is correctly producing the outputs with the keys
jt_google_for_jobs_data_df
and
hj_google_for_jobs_data_df
. 3. Both assets are included in the repository definition so that Dagster is aware of their existence and relationship. Here's a simplified example of how the upstream and downstream assets should be defined:
Copy code
python from dagster import asset, AssetIn, AssetOut, AssetExecutionContext, AssetKey import pandas as pd from dagster import MetadataValue  # Upstream asset @asset(     outs={         "jt_google_for_jobs_data_df": AssetOut(),         "hj_google_for_jobs_data_df": AssetOut(),     } ) def google_for_job_data_to_df(context: AssetExecutionContext,                               jt_api_google_for_jobs: List,                               hj_api_google_for_jobs: List):     df = accuranker_response_to_df(jt_api_google_for_jobs)     df2 = accuranker_response_to_df(hj_api_google_for_jobs)     # add context     context.add_output_metadata(         metadata={             "num_records": len(df),             "preview": MetadataValue.md(df.head().to_markdown())         }     )     return {         "jt_google_for_jobs_data_df": df,         "hj_google_for_jobs_data_df": df2     }  # Downstream asset @asset(ins={     "jt_google_for_jobs_data_df": AssetIn(),     "hj_google_for_jobs_data_df": AssetIn() }) def google_for_jobs_data_to_gcs(context: AssetExecutionContext,                                 jt_google_for_jobs_data_df: pd.DataFrame,                                 hj_google_for_jobs_data_df: pd.DataFrame):     # Your code to upload dataframes to GCS     pass
Make sure that the
google_for_job_data_to_df
asset is correctly registered in your repository and that the keys for the outputs are exactly as specified in the
outs
dictionary. If there's any discrepancy in the keys or if the asset is not registered, Dagster will not be able to resolve the inputs for the downstream asset. If you continue to face issues, double-check the asset keys and ensure that the repository definition includes all necessary assets. If the problem persists, it may be helpful to review the repository and asset definitions to ensure that everything is correctly wired up.
p

Paul Herzog

02/24/2024, 10:40 PM
@Scout can you give me best practise on how to do it with partitioned assets? keep in mind i want to fetch api for mandy different endpoints/params and push each to the same "list to dataframe"-asset (and onwards to "dataframe to cloud storage, cloud storage ti bigquery)
s

Scout

02/24/2024, 10:40 PM
When dealing with multiple endpoints or parameters and you want to process each through the same "list to dataframe" asset, partitioned assets can be a good solution. Partitioned assets allow you to represent a collection of objects that are all computed in the same way but with different data for each partition. Here's a high-level approach to using partitioned assets for your use case: 1. Define Partitions: Determine how you want to partition your data. In your case, each partition could represent data from a different API endpoint or a set of parameters. 2. Create Partitioned Assets for API Fetching: Define a partitioned asset for fetching data from the API. Each partition of this asset will correspond to a different endpoint or parameter set. 3. Create a "List to Dataframe" Asset: Define a downstream partitioned asset that takes the list from the API fetching asset and converts it to a dataframe. This asset will have the same partitioning scheme as the API fetching asset. 4. Create Further Downstream Assets: Define additional downstream assets for uploading the dataframe to cloud storage and then to BigQuery. These assets will also follow the same partitioning scheme. Here's a conceptual example:
Copy code
python
from dagster import asset, DailyPartitionsDefinition, AssetIn, AssetOut
import pandas as pd

# Define your partitions
partitions_def = DailyPartitionsDefinition(start_date="2021-01-01")

# Asset for fetching data from the API, partitioned by endpoint or parameters
@asset(partitions_def=partitions_def)
def fetch_data_from_api(context):
    endpoint = context.partition_key
    # Fetch data from the API using the endpoint or parameters
    data_list = api_fetch_function(endpoint)
    return data_list

# Asset for converting the list to a dataframe
@asset(partitions_def=partitions_def, ins={"api_data": AssetIn()})
def list_to_dataframe(api_data):
    df = pd.DataFrame(api_data)
    return df

# Asset for uploading the dataframe to cloud storage
@asset(partitions_def=partitions_def, ins={"dataframe": AssetIn()})
def dataframe_to_cloud_storage(dataframe):
    # Upload dataframe to cloud storage
    pass

# Asset for loading the data from cloud storage to BigQuery
@asset(partitions_def=partitions_def, ins={"cloud_storage_data": AssetIn()})
def cloud_storage_to_bigquery(cloud_storage_data):
    # Load data to BigQuery
    pass
In this example, each asset is partitioned using the same
DailyPartitionsDefinition
. The
context.partition_key
is used to determine which endpoint or set of parameters to use when fetching data from the API. Each downstream asset takes the output of the previous asset as an input, and they all follow the same partitioning scheme. When you materialize these assets, you will select a partition to ma
terialize, and Dagster will execute the computation for that specific partition across all assets in the pipeline. Keep in mind that this is a high-level example, and you will need to adapt the code to fit your specific API fetching logic, data transformation, and storage mechanisms. Additionally, you may need to configure your resources and IO managers to handle the storage and retrieval of data across these assets.
p

Paul Herzog

02/24/2024, 10:44 PM
@Scout provide me an example for partitioning on "project-id", which is a integer sent via params to endpoint and returns a list with data for each project
s

Scout

02/24/2024, 10:44 PM
To partition your assets based on a "project-id" that is sent via params to an endpoint, you would first define a
PartitionsDefinition
that represents the range of project IDs you want to work with. Since project IDs are integers, you might use a
StaticPartitionsDefinition
if you have a known set of project IDs, or a
DynamicPartitionsDefinition
if project IDs are discovered dynamically. Here's a conceptual example using
StaticPartitionsDefinition
for a known set of project IDs:
Copy code
python
from dagster import asset, StaticPartitionsDefinition, AssetOut, AssetIn
import pandas as pd
import requests

# Define your static partitions for known project IDs
project_ids = [1, 2, 3]  # Replace with your actual project IDs
projects_partitions_def = StaticPartitionsDefinition(
    [str(project_id) for project_id in project_ids]
)

# Asset for fetching data from the API, partitioned by project ID
@asset(partitions_def=projects_partitions_def)
def fetch_data_for_project(context):
    project_id = context.partition_key
    # Fetch data from the API using the project ID
    response = requests.get(f"<https://api.example.com/data?project-id={project_id}>")
    data_list = response.json()
    return data_list

# Asset for converting the list to a dataframe, also partitioned by project ID
@asset(partitions_def=projects_partitions_def, ins={"api_data": AssetIn()})
def list_to_dataframe(api_data):
    df = pd.DataFrame(api_data)
    return df

# Further downstream assets would follow a similar pattern...
In this example,
fetch_data_for_project
is a partitioned asset where each partition corresponds to a different project ID. When you materialize this asset, you will specify the partition key (project ID) to fetch data for that specific project. If you have a dynamic set of project IDs that you cannot know ahead of time, you would use a
DynamicPartitionsDefinition
instead. Here's how you might set that up: ```python from dagster import asset, DynamicPartitionsDefinition, AssetOut, AssetIn import pandas as pd import requests # Define your dynamic partitions for project IDs projects_partitions_def = DynamicPartitionsDefinition(name="projects") # Asset for fetching data from the API, partitioned by project ID @asset(partitions_def=projects_partitions_def) def fetch_data_for_project(context): project_id = context.partition_key # Fetch data from the API using the project ID response = requests.get(f"https://api.example.com/data?project-id={project_id}") data_list = response.json() return data_list # Asset for converting the list to a dataframe, also partitioned by project ID @asset(partitions_def=projects_partitions_def, ins={"api_data": AssetIn()}) def list_to_dataframe(api_data): df = pd.DataFrame(api_data) return df # Furthe
r downstream assets would follow a similar pattern... ``` With
DynamicPartitionsDefinition
, you would typically use a sensor or some other mechanism to dynamically add partitions as new project IDs are discovered. Please note that these examples are conceptual and would need to be adapted to your specific API endpoints, data structures, and business logic. Additionally, you would need to handle authentication, error checking, and other API interaction details as required by your endpoint. Partitioning assets | Dagster button Transforming ETL script to dagster SDA w... button Partition | Dagster Glossary button Data Partitioning | Dagster Glossary button Thank you for your feedback!
4 Views