https://dagster.io/ logo
#ask-ai
Title
# ask-ai
a

Axel Mendoza

03/13/2024, 1:22 PM
@Scout suppose this dagster code:
Copy code
defs = Definitions(
    assets=[iris_data],
    resources={
        "io_manager": BigQueryPandasIOManager(
            project="my-gcp-project",  # required
            location="us-east5",  # optional, defaults to the default location for the project - see <https://cloud.google.com/bigquery/docs/locations> for a list of locations
            dataset="IRIS",  # optional, defaults to PUBLIC
            timeout=15.0,  # optional, defaults to None
        )
    },
)
How can I do if I want certain assets to use the BigQueryPandasIOManager and others not?
s

Scout

03/13/2024, 1:22 PM
In Dagster, you can configure different IO managers for different assets by specifying the
io_manager_key
parameter on the asset definition. This allows you to bind a specific IO manager to an asset. If you want certain assets to use the
BigQueryPandasIOManager
and others not to use it, you can define multiple IO managers and assign them to assets accordingly. Here's an example based on the documentation:
Copy code
python from dagster import asset, Definitions from dagster_gcp_pandas import BigQueryPandasIOManager  # Define an asset that uses the BigQueryPandasIOManager @asset(io_manager_key="bigquery_io_manager") def asset_using_bigquery():     # Your asset code here     ...  # Define an asset that uses a different IO manager or the default IO manager @asset def asset_not_using_bigquery():     # Your asset code here     ...  # Define your resources, including the BigQueryPandasIOManager resources = {     "io_manager": BigQueryPandasIOManager(         project="my-gcp-project",         location="us-east5",         dataset="IRIS",         timeout=15.0,     ),     # You can define other IO managers here }  # Create your Definitions object with the assets and resources defs = Definitions(     assets=[asset_using_bigquery, asset_not_using_bigquery],     resources=resources, )
In this example,
asset_using_bigquery
is explicitly bound to the
bigquery_io_manager
, which is an instance of
BigQueryPandasIOManager
. The
asset_not_using_bigquery
does not specify an
io_manager_key
, so it will use the default IO manager provided by Dagster. If you want to use a different IO manager for
asset_not_using_bigquery
, you can define another IO manager in the
resources
dictionary and reference it using the
io_manager_key
parameter in the asset definition.
a

Axel Mendoza

03/13/2024, 1:32 PM
@Scout Does the dataset needs to exists or will the BigQueryPandasIOManager will create it?
s

Scout

03/13/2024, 1:32 PM
The
BigQueryPandasIOManager
expects the dataset to already exist in BigQuery. It does not automatically create the dataset for you. You should ensure that the dataset you specify in the
BigQueryPandasIOManager
configuration is already created in your Google Cloud Platform (GCP) project. If the dataset does not exist, you will need to create it manually using the Google Cloud Console, the
bq
command-line tool, or the BigQuery client library before running your Dagster pipeline that uses the
BigQueryPandasIOManager
. Here's an example of how you might create a dataset using the BigQuery client library:
Copy code
python from google.cloud import bigquery  # Construct a BigQuery client object. client = bigquery.Client()  # Set your dataset ID (the dataset must be within the same project as the client). dataset_id = "{}.your_dataset".format(client.project)  # Construct a full Dataset object to send to the API. dataset = bigquery.Dataset(dataset_id)  # Specify the geographic location where the dataset should reside. dataset.location = "us-east5"  # Send the dataset to the API for creation, with an explicit timeout. dataset = client.create_dataset(dataset, timeout=30)  # Make an API request. print("Created dataset {}.{}".format(client.project, dataset.dataset_id))
Please replace
"your_dataset"
with the name of the dataset you want to create and set the appropriate location. After creating the dataset, you can then use it in your
BigQueryPandasIOManager
configuration.
a

Axel Mendoza

03/13/2024, 1:58 PM
@Scout Can we use
BigQueryResource
and
BigQueryPandasIOManager
at the same time?
s

Scout

03/13/2024, 1:58 PM
Yes, you can use both
BigQueryResource
and
BigQueryPandasIOManager
at the same time in a Dagster pipeline.
BigQueryResource
is a resource that provides a way to interact with BigQuery through a resource object, which can be used to execute queries or perform other operations directly against BigQuery. On the other hand,
BigQueryPandasIOManager
is an IO manager that handles the storage and retrieval of Pandas DataFrames to and from BigQuery tables. Here's how you might configure both in your Dagster pipeline:
Copy code
python from dagster import asset, ResourceDefinition, io_manager from dagster_gcp.bigquery.resources import bigquery_resource from dagster_gcp_pandas import BigQueryPandasIOManager  @io_manager(required_resource_keys={"bigquery"}) def bigquery_pandas_io_manager(init_context):     return BigQueryPandasIOManager(         project=init_context.resources.bigquery.project,         location=init_context.resources.bigquery.location,         dataset=init_context.resources.bigquery.dataset,         timeout=init_context.resources.bigquery.timeout,     )  @asset(required_resource_keys={"bigquery"}) def my_asset(context):     # Use the BigQuery resource to execute a query     client = context.resources.bigquery.get_client()     query_job = client.query("SELECT * FROM `my_dataset.my_table`")     results = query_job.result()     # Do something with the results, e.g., convert to a DataFrame     df = results.to_dataframe()     return df  resources = {     "bigquery": ResourceDefinition.hardcoded_resource(bigquery_resource, config={         "project": "my-gcp-project",         "location": "us-east5",         "dataset": "my_dataset",     }),     "io_manager": bigquery_pandas_io_manager, }  # Define your pipeline with the resources
In this example, the
bigquery
resource is used within the
my_asset
asset to interact with BigQuery directly. The
bigquery_pandas_io_manager
is defined as an IO manager that uses the
bigquery
resource for its configuration. This allows you to use the
bigquery
resource for custom BigQuery operations within your assets while also using the
BigQueryPandasIOManager
to handle the storage and retrieval of DataFrames. When configuring your pipeline, you would include both the
bigquery
resource and the
io_manager
in your resource definitions. This setup enables your assets to leverage the capabilities of both the resource and the IO manager as needed.
a

Axel Mendoza

03/13/2024, 2:10 PM
@Scout Why does the BigQueryPandasIOManager produces full caps columns? Can we change this?
s

Scout

03/13/2024, 2:10 PM
The
BigQueryPandasIOManager
converts all columns in the DataFrame to uppercase before loading into BigQuery due to inconsistencies with using lower case column names in Snowflake. This behavior was implemented to maintain consistency across different systems, particularly to address issues with Snowflake, which is case-insensitive and by default converts identifiers (including column names) to uppercase. As of the last update from the documentation and issues, there is a feature request open in the Dagster GitHub repository to allow users to opt out of this automatic conversion to uppercase column names when using
BigQueryPandasIOManager
. However, it has been noted that implementing this feature in a way that allows users to specify their preferred casing while maintaining backward compatibility has proven to be tricky. The issue is being tracked, and the Dagster team is considering it for future work, but as of the last update, there isn't a built-in option to change this behavior in the
BigQueryPandasIOManager
. If you are impacted by this issue or interested in this feature, you may want to follow the progress on the Dagster GitHub repository or contribute to the discussion there. The Dagster team factors community engagement into their prioritization of feature requests, so expressing your interest could help influence the prioritization of this feature.
a

Axel Mendoza

03/13/2024, 2:25 PM
@Scout Suppose I have an asset:
Copy code
@asset(
    name="labels",
    group_name=DB_NAME_RECO,
    deps=[reconciliation_labels, create_bq_dataset],
    io_manager_key="io_manager_bq_reco",
)
def reconciliation_labels_latest(
Is it possible that if I take this asset as a dependency, I can retrieve the name ("labels) of the asset?
Copy code
@asset(
    name="queries",
    group_name=DB_NAME_RECO,
    deps=[reconciliation_labels_latest],
    io_manager_key="io_manager_bq_reco",
)
def reconciliation_queries(
    context: AssetExecutionContext,
    bigquery: BigQueryResource,
) -> pd.DataFrame:
s

Scout

03/13/2024, 2:25 PM
In Dagster, when you define an asset and take another asset as a dependency, the function that defines the dependent asset receives the output of the upstream asset as an argument. However, the name of the asset itself is not automatically passed to the function. Instead, the function receives the actual data that the upstream asset produces. In your example,
reconciliation_queries
depends on
reconciliation_labels_latest
, so it will receive the output of
reconciliation_labels_latest
as an argument. The name of the upstream asset (
"labels"
) is not directly accessible within the body of the
reconciliation_queries
function. If you need to access the name of the upstream asset within the
reconciliation_queries
function, you would have to pass it explicitly, perhaps as metadata or by some other means. However, this is not a common pattern in Dagster, as the asset's name is typically used for organizational and selection purposes rather than as data to be consumed by the asset's logic. If you need to reference the name of the asset for some reason, you could define it as a constant and use it both in the asset definition and within the function body:
Copy code
python LABELS_ASSET_NAME = "labels"  @asset(     name=LABELS_ASSET_NAME,     group_name=DB_NAME_RECO,     deps=[reconciliation_labels, create_bq_dataset],     io_manager_key="io_manager_bq_reco", ) def reconciliation_labels_latest():     ...  @asset(     name="queries",     group_name=DB_NAME_RECO,     deps=[reconciliation_labels_latest],     io_manager_key="io_manager_bq_reco", ) def reconciliation_queries(     context: AssetExecutionContext,     bigquery: BigQueryResource, ) -> pd.DataFrame:     # You can use LABELS_ASSET_NAME here if needed     ...
This way, you have a single source of truth for the asset name and can use it wherever necessary in your code. However, this is just a workaround, and the actual asset name is not dynamically passed between assets during execution.