kyle
06/30/2023, 5:33 PMdagster/_core/workspace/context.py:610: UserWarning: Error loading repository location api.py:dagster._core.errors.DagsterInvalidDefinitionError: resource with key 'feather' required by op 'api_get_list' was not provided. Please provide a <class 'dagster._core.definitions.resource_definition.ResourceDefinition'> to key 'feather', or change the required key to one of the following keys which points to an <class 'dagster._core.definitions.resource_definition.ResourceDefinition'>: ['io_manager']
resources.py:
class FeatherManager:
def __init__(self, directory):
self.directory = directory
def get(self, api, assetname):
filepath = f"{self.directory}/{api}/{assetname}.feather"
return pd.read_feather(filepath)
def set(self, api, assetname, df: DataFrame):
filepath = f"{self.directory}/{api}/{assetname}.feather"
df.to_feather(filepath)
@resource
def feather_resource(_):
directory = os.getenv("DATA_FEATHER_ROOT")
return FeatherManager(directory)
defs = Definitions(
assets=[
test_mongo_connection,
test_redis_connection,
test_feather_connection,
test_parquet_connection,
],
resources={
"mongo": mongo_resource,
"redis": redis_resource,
"feather": feather_resource,
"parquet": parquet_resource,
},
)
api.py:
# other imports ...
from resources import feather_resource
from resources import mongo_resource
@asset(required_resource_keys={"feather"})
def api_get_list(context) -> Output[None]:
feather = context.resources.feather
# code to make api call and convert to dataframe ...
# write df to feather file
feather.set("api", "api_get_list", df)
# set metadata on asset
return Output(
None,
metadata={
"computed_at": datetime.now().isoformat(),
"count": len(df),
"tickers_first5": df.head(5).to_json(orient="records")
}
)
@repository
def wbrepo():
return [
api_get_list
]
Tim Castillo
06/30/2023, 5:52 PMdefs
.
You might benefit from looking at our recommended project structure or an example projectkyle
06/30/2023, 8:07 PMsean
06/30/2023, 9:46 PMkyle
06/30/2023, 9:48 PM# DOCS: <https://docs.dagster.io/concepts/code-locations#defining-code-locations>
defs = Definitions(
assets=defs_assets,
##resources=resources_by_deployment_name[deployment_name],
##schedules=[core_assets_schedule],
##sensors=defs_sensors,
)