Hello, So far we have dagster+bigquery+dbt working...
# ask-community
s
Hello, So far we have dagster+bigquery+dbt working ok. Now we are trying to read and write files into google cloud storage, any idea how to do it? do I need to create a custom io manager? in the example below I was able to read and write using pandas • daily_order_summary = pd.read_json('gs://data-analytics/test-data/test.json') • daily_order_summary.to_json('gs://data-analytics/test-data/test.json') but I guess Dagster must have a different way to separate assets from IO?
Copy code
@asset(
    ins={
        "daily_order_summary": AssetIn(key_prefix=["analytics"]),
        "order_forecast_model": AssetIn(),
    },
    compute_kind="ml_tool",
    key_prefix=["forecasting"],
)
def predicted_orders(
    daily_order_summary: pd.DataFrame, order_forecast_model: Tuple[float, float]
) -> pd.DataFrame:
    #Predicted orders for the next 30 days based on the fit paramters
    a, b = order_forecast_model
    daily_order_summary = pd.read_json('<gs://data-analytics/test-data/test.json>')
    start_date = daily_order_summary.order_date.max()
    print(pd.to_datetime(start_date, unit='s'))
    future_dates = pd.date_range(
        start=pd.to_datetime(start_date, unit='s'), end=pd.to_datetime(start_date, unit='s') + pd.DateOffset(days=30)
    )
    #daily_order_summary.to_json('<gs://data-analytics/test-data/test.json>')
    predicted_data = model_func(x=future_dates.astype(np.int64), a=a, b=b)
    return pd.DataFrame({"order_date": future_dates, "num_orders": predicted_data})
Hi @Sean Lopp We figure out how to use io_manager for GCS. we just don't know how to pass the credentials, the below code is working if we use OAuth, but any idea of how to configure to use a service account passing a json file?
Copy code
"io_manager_gcs": gcs_pickle_io_manager.configured(
        {
            "gcs_bucket": "data-analytics", 
            "gcs_prefix": "garys-prefix"
        }
    ),
    "gcs": gcs_resource,
s
Google auth is tricky. In this project I got things to work nicely by having the service account JSON file encoded as an env var and then decoded to a temp file on disk when the dagster code is loaded. Then I point a known google environment variable at the temp file. With this approach most clients and google related packages just work without further config. The project readme has details and code https://github.com/slopp/dagster-conditional-etl-gcp-demo
s
Here's one that worked for me:
Copy code
service_account_creds = os.environ['GCP_SERVICE_ACCOUNT_CREDS']
gcs_credentials = service_account.Credentials.from_service_account_info(json.loads(service_account_creds))
storage_client = storage.Client(credentials=gcs_credentials)
resource_defs = {
    "data_lake_io_manager": gcs_pickle_io_manager.configured(
        {
            "gcs_bucket": "my_bucket",
            "gcs_prefix": "my_prefix"
        }
    ),
    "gcs": ResourceDefinition.hardcoded_resource(storage_client)
}
The
gcs
param at the end was the key to getting it working
👍 1