George Pearse
06/22/2022, 1:40 PMml_cxr_datalake = SourceAsset(
key=AssetKey('ml_cxr_datalake'),
io_manager_key='ml_mongo_warehouse',
description='Mongo collection with ready for CXR model training'
)
error:
docker-dev-dagster-pipelines | File "/usr/local/lib/python3.9/site-packages/dagster/core/definitions/target.py", line 35, in __new__
docker-dev-dagster-pipelines | check.inst_param(
docker-dev-dagster-pipelines | File "/usr/local/lib/python3.9/site-packages/dagster/_check/__init__.py", line 633, in inst_param
docker-dev-dagster-pipelines | raise _param_type_mismatch_exception(
docker-dev-dagster-pipelines | dagster._check.ParameterCheckError: Param "target" is not one of ['GraphDefinition', 'PipelineDefinition', 'UnresolvedAssetJobDefinition']. Got SourceAsset(key=AssetKey(['ml_cxr_datalake']), metadata_entries=[], io_manager_key='ml_mongo_warehouse', description='Mongo collection with ready for CXR model training', partitions_def=None, group_name='default', resource_defs={}) which is type <class 'dagster.core.asset_defs.source_asset.SourceAsset'>.
sandy
06/22/2022, 2:23 PMGeorge Pearse
06/22/2022, 2:28 PMsandy
06/22/2022, 3:05 PM@repository
? If you share more of your code, or how you're launching Dagit, I'd likely be able to help you fix it.George Pearse
06/22/2022, 3:13 PMfrom dagster import load_assets_from_modules, with_resources, AssetKey, SourceAsset, asset, IOManager, IOManagerDefinition, with_resources, define_asset_job, repository
import pandas as pd
from pandas import DataFrame
import os
sfo_q2_weather_sample = SourceAsset(
key=AssetKey("sfo_q2_weather_sample"),
description="Weather samples, taken every five minutes at SFO",
metadata={"format": "csv"},
)
class LocalFileSystemIOManager(IOManager):
"""Translates between Pandas DataFrames and CSVs on the local filesystem."""
def _get_fs_path(self, asset_key: AssetKey) -> str:
rpath = os.path.join(*asset_key.path) + ".csv"
return os.path.abspath(rpath)
def handle_output(self, context, obj: DataFrame):
"""This saves the dataframe as a CSV."""
fpath = self._get_fs_path(context.asset_key)
obj.to_csv(fpath)
def load_input(self, context):
"""This reads a dataframe from a CSV."""
fpath = self._get_fs_path(context.asset_key)
return pd.read_csv(fpath)
@asset
def daily_temperature_highs(sfo_q2_weather_sample: DataFrame) -> DataFrame:
"""Computes the temperature high for each day"""
sfo_q2_weather_sample["valid_date"] = pd.to_datetime(sfo_q2_weather_sample["valid"])
return sfo_q2_weather_sample.groupby("valid_date").max().rename(columns={"tmpf": "max_tmpf"})
@asset
def hottest_dates(daily_temperature_highs: DataFrame) -> DataFrame:
"""Computes the 10 hottest dates"""
return daily_temperature_highs.nlargest(10, "max_tmpf")
@repository
def software_defined_assets():
return [
*with_resources(
[hottest_dates, daily_temperature_highs],
resource_defs={
"io_manager": IOManagerDefinition.hardcoded_io_manager(LocalFileSystemIOManager())
}),
define_asset_job("weather_assets_job"),
]
Using temporary directory /opt/dagster/pipelines/research_and_development/tmpbv49k8pj for storage. This will be removed when dagit exits.
To persist information across sessions, set the environment variable DAGSTER_HOME to a directory to use.
0it [00:00, ?it/s]
0it [00:00, ?it/s]
/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/core/workspace/context.py:556: UserWarning: Error loading repository location software_defined_assets.py:dagster.core.errors.DagsterInvalidDefinitionError: Input asset '["sfo_q2_weather_sample"]' for asset '["daily_temperature_highs"]' is not produced by any of the provided asset ops and is not one of the provided sources
Stack Trace:
File "/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/grpc/server.py", line 227, in __init__
self._container_image,
File "/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/grpc/server.py", line 103, in __init__
loadable_target_origin.attribute,
File "/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/grpc/utils.py", line 33, in get_loadable_targets
else loadable_targets_from_python_file(python_file, working_directory)
File "/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/core/workspace/autodiscovery.py", line 26, in loadable_targets_from_python_file
loaded_module = load_python_file(python_file, working_directory)
File "/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/core/code_pointer.py", line 86, in load_python_file
return import_module_from_path(module_name, python_file)
File "/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/seven/__init__.py", line 50, in import_module_from_path
spec.loader.exec_module(module)
File "<frozen importlib._bootstrap_external>", line 728, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "software_defined_assets.py", line 48, in <module>
@repository
ld-pipelines-env/lib/python3.7/site-packages/dagster/core/workspace/context.py:556: UserWarning: Error loading repository location repo.py:dagster.core.errors.DagsterInvalidDefinitionError: Bad return value from repository construction function: all elements of list must be of type JobDefinition, GraphDefinition, PipelineDefinition, PartitionSetDefinition, ScheduleDefinition, SensorDefinition, AssetsDefinition, or SourceAsset.Got value of type <class 'list'> at index 0.
Stack Trace:
File "/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/grpc/server.py", line 227, in __init__
self._container_image,
File "/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/grpc/server.py", line 103, in __init__
loadable_target_origin.attribute,
File "/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/grpc/utils.py", line 33, in get_loadable_targets
else loadable_targets_from_python_file(python_file, working_directory)
File "/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/core/workspace/autodiscovery.py", line 26, in loadable_targets_from_python_file
loaded_module = load_python_file(python_file, working_directory)
File "/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/core/code_pointer.py", line 86, in load_python_file
return import_module_from_path(module_name, python_file)
File "/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/seven/__init__.py", line 50, in import_module_from_path
spec.loader.exec_module(module)
File "<frozen importlib._bootstrap_external>", line 728, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "software_defined_assets/repo.py", line 6, in <module>
@repository
File "/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/core/definitions/decorators/repository_decorator.py", line 258, in repository
return _Repository()(name)
File "/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/core/definitions/decorators/repository_decorator.py", line 69, in __call__
"Bad return value from repository construction function: all elements of list "
location_name=location_name, error_string=error.to_string()
2022-06-22 16:21:49 +0000 - dagit - INFO - Serving dagit on <http://127.0.0.1:4000> in process 1024118
sandy
06/22/2022, 5:03 PM*
):
@repository
def software_defined_assets():
return [*spark_weather_assets]