https://dagster.io/ logo
#ask-ai
Title
# ask-ai
s

Slackbot

02/21/2024, 11:24 PM
This message was deleted.
s

Scout

02/21/2024, 11:25 PM
The error message you're encountering indicates that there was an issue during the execution of an op named
imf_metadata
. Specifically, the error is a
DagsterExecutionStepExecutionError
, which is a type of error that occurs when there is a problem executing an op within a Dagster pipeline. The root cause of the error seems to be a
TypeError
that was raised because an unexpected keyword argument
name
was passed to the
__init__
method of some object. This is indicated by the line in the stack trace:
Copy code
TypeError: __init__() got an unexpected keyword argument 'name'
This error occurred within the
fetch_imf_metadata
function, which is called by the
imf_metadata
op. The
fetch_imf_metadata
function appears to be attempting to return an
Output
object, but it seems that the
Output
constructor was called with an incorrect argument. To resolve this issue, you should check the definition of the
Output
object being returned by the
fetch_imf_metadata
function. Ensure that you are only using the expected arguments when constructing the
Output
object. If
name
is not a valid argument for the
Output
constructor, you should remove it or replace it with the correct argument. If you need further assistance with debugging, you can refer to the documentation for the
Output
class to understand the expected arguments, or you can provide more context or code snippets for a more detailed analysis.
p

Peter Mburu

02/21/2024, 11:27 PM
@Scout here is the code: def fetch_imf_metadata(config: IMFConfig, group_indicator_code:str="CL_INDICATOR_CPI") -> Output[pd.DataFrame]: """ Get a list of required metadata with their codes. We will be using the code to send request to the API for a specific product later. """ url = f"http://dataservices.imf.org/REST/SDMX_JSON.svc/CodeList/{group_indicator_code}" with bulk_limiter.ratelimit(delay=True): logger.debug("Fetching IMF metadata list from %s" % url) response = http.get(url) response.raise_for_status() code = response.json()["Structure"]["CodeLists"]["CodeList"]["Code"] products = [ (c["@value"], c["Description"]["#text"].replace("Primary Commodity Prices, ", "").strip()) for c in code ] df = pd.DataFrame(products, columns=["Code", "Commodity"]) if group_indicator_code == "CL_INDICATOR_PCPS": def get_unit(product): for key in PRODUCT_UNITS: if product in PRODUCT_UNITS[key]: return key return None df["Unit"] = df["Commodity"].apply(get_unit) # Exclude the products Aluminum, Cocoa, and Cotton because they match # multiple products in FDW. exclude = ["Aluminum", "Cocoa", "Cotton"] df = df[~df["Commodity"].isin(exclude)] df = ClassifiedProductLookup().do_lookup(df, "Commodity", "product") df = UnitOfMeasureLookup().do_lookup(df, "Unit", "unit") else: required_indicators = ( CPI_indicators if group_indicator_code == "CL_INDICATOR_CPI" else GDP_and_labor_indicators ) df = df.loc[df["Code"].isin(required_indicators)] df = df.replace({np.nan: None}) code_to_asset_name_map = { "CL_INDICATOR_CPI": "cpi", "CL_INDICATOR_IFS": "gpd_labor", "CL_INDICATOR_PCPS": "price" } return Output( df, metadata={ "num_of_metadata": len(df), "preview": MetadataValue.md(df.head(config.preview_rows).to_markdown()), }, name = f"{code_to_asset_name_map.get(group_indicator_code)}_metadata" ) @multi_asset(outs={ "cpi_metadata": AssetOut(is_required=False), "price_metadata": AssetOut(is_required=False), "gpd_labor_metadata": AssetOut(is_required=False), }, can_subset=True,) def imf_metadata(config: IMFConfig) -> Output[pd.DataFrame]: """ Get a list of required metadata with their codes. We will be using the code to send request to the API for a specific product later. """ for code in ["CL_INDICATOR_CPI", "CL_INDICATOR_IFS", "CL_INDICATOR_PCPS"]: yield fetch_imf_metadata(config, code)
s

Scout

02/21/2024, 11:27 PM
The error you're encountering is due to the
Output
constructor being called with an unexpected keyword argument
name
. In Dagster, the
Output
object does not accept a
name
argument directly in its constructor. Instead, the name of an output is typically specified by the
out
parameter in the
@op
decorator or the
outs
parameter in the
@multi_asset
decorator. In your
fetch_imf_metadata
function, you are trying to return an
Output
object with a
name
argument, which is causing the error. To fix this, you should remove the
name
argument from the
Output
constructor. Here's how you can modify the
fetch_imf_metadata
function to fix the error:
Copy code
python
def fetch_imf_metadata(config: IMFConfig, group_indicator_code: str = "CL_INDICATOR_CPI") -> Output[pd.DataFrame]:
    # ... existing code ...

    code_to_asset_name_map = {
        "CL_INDICATOR_CPI": "cpi",
        "CL_INDICATOR_IFS": "gpd_labor",
        "CL_INDICATOR_PCPS": "price"
    }
    asset_name = f"{code_to_asset_name_map.get(group_indicator_code)}_metadata"
    return Output(
        df,
        metadata={
            "num_of_metadata": len(df),
            "preview": MetadataValue.md(df.head(config.preview_rows).to_markdown()),
        }
    )
Notice that I've removed the
name
argument from the
Output
constructor and instead created a variable
asset_name
that you can use elsewhere if needed. For the
imf_metadata
function, you are using the
@multi_asset
decorator to define multiple outputs. If you want to yield outputs with specific names, you should yield them with the corresponding output name that you have defined in the
outs
parameter of the
@multi_asset
decorator. Here's an example of how you might yield named outputs in the
imf_metadata
function:
Copy code
python
@multi_asset(outs={
        "cpi_metadata": AssetOut(is_required=False),
        "price_metadata": AssetOut(is_required=False),
        "gpd_labor_metadata": AssetOut(is_required=False),
    },
    can_subset=True,)
def imf_metadata(config: IMFConfig):
    # ... existing code ...
    for code in ["CL_INDICATOR_CPI", "CL_INDICATOR_IFS", "CL_INDICATOR_PCPS"]:
        output_df = fetch_imf_metadata(config, code).value
        asset_name = f"{code_to_asset_name_map.get(code)}_metadata"
        yield Output(output_df, output_name=asset_name)
In this example,
output_name
should match one of the keys defined in the
outs
parameter of the
@multi_asset
decorator. Make sure that the
output_name
you yield corresponds to the names you've defined for your assets. Pushing REST-API data to Google Sheets w... button Using Dagster with Google BigQuery | Dag... button Inter group asset dependencies not worki... button Dagster Docs button Thank you for your feedback!
p

Peter Mburu

02/21/2024, 11:35 PM
@Scout refactor the code below: @asset def remote_imf_data( config: IMFConfig, imf_metadata, get_imf_datasource_documents, get_imf_data_series ) -> Output[pd.DataFrame]: """ Get a list of all data matching metadata fetched in FetchMetadata task. """ document_type = config.document_type domain = config.domain frequency = config.frequency if document_type == "PriceIndex" or document_type == "MarketProduct": get_imf_datasource_documents = get_imf_datasource_documents.filter(document_type=document_type) get_imf_data_series = get_imf_data_series.select_related("_country").filter(indicator=document_type) if document_type == "Labor Statistics" or document_type == "Economic Statistics": get_imf_datasource_documents = get_imf_datasource_documents.filter(document_type__name=document_type) indicators = ( GDP_indicator_names_map.keys() if document_type == "Economic Statistics" else Labor_indicator_names_map.keys() ) get_imf_data_series = get_imf_data_series.select_related("_country").filter(indicator__in=indicators) def get_url(series: QuerySet, imf_cpi_metadata: pd.DataFrame, last_collection_date: datetime.date): if document_type == "PriceIndex": indicatorname = cpi_indicator_names_remap(series) elif document_type == "Labor Statistics": indicatorname = Labor_indicator_names_map.get(str(series.indicator_id)) elif document_type == "Economic Statistics": indicatorname = GDP_indicator_names_map.get(str(series.indicator_id)) elif document_type == "MarketProduct": indicatorname = series.product_id rows = imf_cpi_metadata[imf_cpi_metadata["Commodity"] == indicatorname] if len(rows) == 1: code = rows.iloc[0]["Code"] if document_type == "MarketProduct": global unit unit = rows.iloc[0]["unit"] period_date = last_collection_date or datetime.date(2000, 1, 1) start_date = period_date + datetime.timedelta(days=1) month = "{:02d}".format(start_date.month - 3) # Start fetch requests 3 month back from last_collection_date # Price and Priceindex are monthly while Labor and economic data are annual # Hence need to strip month addition to startPeriod Parameter value start_date_str = ( str(start_date.year) + "-" + month if document_type in ["MarketProduct", "PriceIndex"] else start_date.year - 3 # Start fetch requests 3 years back from last_collection_date ) url = f"{GetBaseUrl()}CompactData/{domain}/{frequency}.{series._country_id}.{code}?startPeriod={start_date_str}" return url def fetch_data(url: str): # Throttle requests to avoid hitting rate limit https://datahelp.imf.org/knowledgebase/articles/630877-data-services with bulk_limiter.ratelimit(delay=True): logger.debug("Fetching IMF dataset list from %s" % url) result = http.get(url) result.raise_for_status() return result def extract_data(result: requests.Response, series: QuerySet, datasourcedocument: QuerySet): content_type = result.headers.get("content-type") df = pd.DataFrame() if "application/json" not in content_type: return df else: data = result.json()["CompactData"]["DataSet"] if "Series" in data and "Obs" in data["Series"]: data_list = data["Series"]["Obs"] # Make sure data_list is a list of dictionaries or tuples num_records = len(data_list) # pd.DataFrame.from_records() requires an index to be passed when extracting df from json, hence we # generate a sequence of values starting from 1 to be passed as index df_index = np.arange(1, num_records + 1) df = pd.DataFrame.from_records(data_list, index=df_index) if "@OBS_VALUE" not in df.columns: df["@OBS_VALUE"] = None df.rename( columns={ "@TIME_PERIOD": "start_date", "@REFERENCE_PERIOD": "reference_period", "@OBS_VALUE": "value", }, inplace=True, ) df["value"] = df["value"].astype(float).round(decimals=8) if document_type == "PriceIndex": df["start_date"] = df["start_date"] + "-01" df["index_name"] = series.index_name df["period_date"] = df["start_date"].apply(lambda d: pd.Period(d, freq="M").end_time.date()) df["datasourceorganization"] = datasourcedocument.datasourceorganization.id elif document_type == "Labor Statistics": df["start_date"] = df["start_date"] + "-01" + "-01" df["indicator"] = series.indicator_id df["geographic_unit"] = series.geographic_unit_id df["period_date"] = df["start_date"].apply(lambda d: pd.Period(d, freq="A").end_time.date()) df["datasourceorganization"] = datasourcedocument.datasourceorganization.name elif document_type == "Economic Statistics": df["start_date"] = df["start_date"] + "-01" + "-01" df["indicator"] = series.indicator_id df["geographic_unit"] = series.geographic_unit_id df["value"] = df["value"].multiply(1000) df["period_date"] = df["start_date"].apply(lambda d: pd.Period(d, freq="A").end_time.date()) df["datasourceorganization"] = datasourcedocument.datasourceorganization.name elif document_type == "MarkteProduct": df["start_date"] = df["start_date"] + "-01" df["product"] = series.product_id df["unit"] = series.unit_id if unit is None else unit df["market"] = series.market_id df["country"] = series._country_id df["currency"] = "USD" df["period_date"] = df["start_date"].apply(lambda d: pd.Period(d, freq="M").end_time.date()) df["datasourceorganization"] = datasourcedocument.datasourceorganization.id df["dataseries"] = series.id df["country"] = series._country_id df["datasourcedocument"] = datasourcedocument.pk df = df.drop_duplicates() # Drop rows where 'value' column is NaN or None df.dropna(subset=["value"], inplace=True) return df def fetch_remote_data( get_imf_data_series: QuerySet, imf_metadata: pd.DataFrame, get_imf_datasource_documents: QuerySet ): dataframe = pd.DataFrame() if document_type in ["PriceIndex", "MarketProduct"]: datasourcedocument = get_imf_datasource_documents.filter(document_type=document_type).first() else: datasourcedocument = get_imf_datasource_documents.filter(document_type__name=document_type).first() last_collection_date = datasourcedocument.last_collection_date get_imf_data_series = get_imf_data_series.filter(datasourcedocument_id=datasourcedocument.pk) for series in get_imf_data_series: url = get_url(series, imf_metadata, last_collection_date) print(url) result = fetch_data(url) dataframe = pd.concat([dataframe, extract_data(result, series, datasourcedocument)], ignore_index=True) return dataframe df = fetch_remote_data(get_imf_data_series, imf_metadata, get_imf_datasource_documents) return Output( df, metadata={ f"num_f_{config.document_type}_data": len(df), "preview": MetadataValue.md(df.head(config.preview_rows).to_markdown()), }, )
s

Scout

02/21/2024, 11:35 PM
To refactor the
remote_imf_data
asset function, we can follow a few steps to improve the readability and maintainability of the code. We'll break down the large function into smaller, more focused functions, and we'll also remove any unnecessary global state or side effects to make the code more functional in style. Here's a refactored version of the code: ```python from typing import Optional import datetime import pandas as pd import requests from dagster import asset, Output, MetadataValue # Assuming the existence of the following functions and variables: # - bulk_limiter # - logger # - http # - cpi_indicator_names_remap # - Labor_indicator_names_map # - GDP_indicator_names_map # - GetBaseUrl() def filter_documents_by_type( documents: QuerySet, data_series: QuerySet, document_type: str ) -> Tuple[QuerySet, QuerySet]: if document_type in ["PriceIndex", "MarketProduct"]: documents = documents.filter(document_type=document_type) data_series = data_series.select_related("_country").filter(indicator=document_type) elif document_type in ["Labor Statistics", "Economic Statistics"]: documents = documents.filter(document_type__name=document_type) indicators = ( GDP_indicator_names_map.keys() if document_type == "Economic Statistics" else Labor_indicator_names_map.keys() ) data_series = data_series.select_related("_country").filter(indicator__in=indicators) return documents, data_series def get_url( series: QuerySet, imf_metadata: pd.DataFrame, last_collection_date: Optional[datetime.date], document_type: str, domain: str, frequency: str ) -> Optional[str]: # ... existing get_url function logic ... pass def fetch_data(url: str) -> requests.Response: # ... existing fetch_data function logic ... pass def extract_data( result: requests.Response, series: QuerySet, datasourcedocument: QuerySet, document_type: str ) -> pd.DataFrame: # ... existing extract_data function logic ... pass def fetch_remote_data( get_imf_data_series: QuerySet, imf_metadata: pd.DataFrame, get_imf_datasource_documents: QuerySet, document_type: str, domain: str, frequency: str ) -> pd.DataFrame: # ... existing fetch_remote_data function logic ... pass @asset def remote_imf_data( config: IMFConfig, imf_metadata, get_imf_datasource_documents, get_imf_data_series ) -> Output[pd.DataFrame]: """ Get a list of all data matching metadata fetched in FetchMetadata task. """ document_type = config.document_type domain = config.domain frequency = config.frequency get_imf_datasource_documents, get_imf_data_series = filter_documents_by_type( get_imf_datasource_documents, get_imf_data_series, document_type
) df = fetch_remote_data( get_imf_data_series, imf_metadata, get_imf_datasource_documents, document_type, domain, frequency ) return Output( df, metadata={ f"num_f_{document_type}_data": len(df), "preview": MetadataValue.md(df.head(config.preview_rows).to_markdown()), }, ) ``` In this refactoring: 1. We've extracted the logic for filtering documents and data series into a separate function
filter_documents_by_type
. 2. The
get_url
,
fetch_data
,
extract_data
, and
fetch_remote_data
functions have been defined outside of the
remote_imf_data
function to separate concerns and improve testability. 3. We've removed any global state or side effects from within the functions, making them pure functions that depend only on their input arguments. 4. The
remote_imf_data
function now focuses on orchestrating the workflow by calling the helper functions with the appropriate arguments. Please note that this refactoring assumes that the
QuerySet
type and other domain-specific functions and variables (like
bulk_limiter
,
logger
,
http
, etc.) are defined elsewhere in your codebase. You'll need to ensure that these are correctly imported or defined for the refactored code to work. Additionally, the
get_url
and
extract_data
functions will need to be updated with the appropriate logic from the original code, as indicated by the placeholder comments.
p

Peter Mburu

02/21/2024, 11:40 PM
@Scout Is there a way to access daster output metdadata
s

Scout

02/21/2024, 11:40 PM
Yes, in Dagster, you can access output metadata. Metadata in Dagster can be attached to various objects, including assets, ops, and outputs. When you yield an
Output
object from an op or asset, you can attach metadata to it, which can then be accessed in different ways depending on your needs. Here's an example of how you might attach metadata to an `Output`:
Copy code
python
from dagster import op, Output, MetadataValue

@op
def my_op(context):
    # ... perform some computation ...
    result = compute_something()
    yield Output(
        result,
        metadata={
            "my_key": "my_value",
            "dashboard_url": MetadataValue.url("<http://mycoolsite.com/my_dashboard>"),
            "num_rows": len(result),
        },
    )
In this example,
my_key
,
dashboard_url
, and
num_rows
are metadata entries attached to the output of
my_op
. To access this metadata downstream, you can use the
context
object within an op or asset. However, as of my knowledge cutoff date in early 2023, there is no direct way to access the metadata of an upstream op's output from within the body of a downstream op. Metadata is typically used for display purposes in tools like Dagit, Dagster's web interface, where it can be viewed as part of the run logs or asset catalog. If you need to pass information from one op to another, you would typically do so by passing the data itself as an output of the upstream op and as an input to the downstream op. If you need to pass metadata-like information, you would include it as part of the data structure being passed between ops. For more advanced use cases, such as accessing metadata for asset materializations or events, you might need to interact with Dagster's instance APIs to query the event log or asset catalog. This would typically be done outside of the execution of a pipeline, such as in a monitoring script or a custom tool built on top of Dagster. Please note that Dagster is an actively developed project, and new features or APIs may have been introduced after my knowledge cutoff date. For the most up-to-date information, you should consult the latest Dagster documentation or reach out to the Dagster community for assistance. Request: HTML metadata type for events. button Improving visibility into data operation... button Output metadata access and consolidation button Make runtime output metadata available t... button Thank you for your feedback!
p

Peter Mburu

02/22/2024, 12:24 AM
@Scout how do i access the output of the upstream assets @multi_asset( outs={ "cpi_remote_data": AssetOut(is_required=False), "price_remote_data": AssetOut(is_required=False), "gpd_remote_data": AssetOut(is_required=False), "labor_remote_data": AssetOut(is_required=False), }, internal_asset_deps={ "cpi_remote_data": {AssetKey("cpi_metadata")}, "price_remote_data": {AssetKey("price_metadata")}, "gpd_remote_data": {AssetKey("gpd_labor_metadata")}, "labor_remote_data": {AssetKey("gpd_labor_metadata")}, }, can_subset=True, ) def remote_imf_data(config: IMFConfig)
s

Scout

02/22/2024, 12:24 AM
In Dagster, when you define a
@multi_asset
decorator, you can specify dependencies on other assets using the
internal_asset_deps
parameter. This parameter maps each output asset to a set of `AssetKey`s that it depends on. When you execute the function decorated with
@multi_asset
, Dagster ensures that the upstream assets specified in
internal_asset_deps
are available before executing the function. To access the output of upstream assets within the body of a
@multi_asset
function, you should define parameters in the function that correspond to the names of the upstream assets. Dagster will automatically pass the values of these upstream assets into the function when it is executed. Here's an example of how you might define a
@multi_asset
function that depends on and accesses the outputs of upstream assets:
Copy code
python
from dagster import multi_asset, AssetOut, AssetKey

@multi_asset(
    outs={
        "cpi_remote_data": AssetOut(is_required=False),
        "price_remote_data": AssetOut(is_required=False),
        "gpd_remote_data": AssetOut(is_required=False),
        "labor_remote_data": AssetOut(is_required=False),
    },
    internal_asset_deps={
        "cpi_remote_data": {AssetKey("cpi_metadata")},
        "price_remote_data": {AssetKey("price_metadata")},
        "gpd_remote_data": {AssetKey("gpd_labor_metadata")},
        "labor_remote_data": {AssetKey("gpd_labor_metadata")},
    },
    can_subset=True,
)
def remote_imf_data(config: IMFConfig, cpi_metadata, price_metadata, gpd_labor_metadata):
    # Here you can access the outputs of the upstream assets
    # cpi_metadata, price_metadata, and gpd_labor_metadata are the outputs of the upstream assets
    # You can now use these outputs to perform further computations or transformations

    # Perform some computation with the upstream data
    cpi_data = process_cpi_data(cpi_metadata)
    price_data = process_price_data(price_metadata)
    gpd_data = process_gpd_data(gpd_labor_metadata)
    labor_data = process_labor_data(gpd_labor_metadata)

    # Yield the results as outputs of this multi_asset
    yield cpi_data, "cpi_remote_data"
    yield price_data, "price_remote_data"
    yield gpd_data, "gpd_remote_data"
    yield labor_data, "labor_remote_data"
In this example,
cpi_metadata
,
price_metadata
, and
gpd_labor_metadata
are the parameters that correspond to the upstream assets. The function
remote_imf_data
can use these parameters to access the data produced by the upstream assets. The
yield
statements then output the processed data, associating each output with the corresponding asset name defined in the
outs
parameter. Please note that the actual processing functions (
process_cpi_data
,
process_price_data
, etc.) are not defined in this example and would need
to be implemented according to your specific logic.