https://dagster.io/ logo
#ask-community
Title
# ask-community
g

George Pearse

06/22/2022, 1:40 PM
Trying to use software defined assets for my new pipelines, currently receiving this error for my SourceAsset definition. My code:
Copy code
ml_cxr_datalake = SourceAsset(
    key=AssetKey('ml_cxr_datalake'),
    io_manager_key='ml_mongo_warehouse',
    description='Mongo collection with ready for CXR model training'
)
error:
Copy code
docker-dev-dagster-pipelines     |   File "/usr/local/lib/python3.9/site-packages/dagster/core/definitions/target.py", line 35, in __new__
docker-dev-dagster-pipelines     |     check.inst_param(
docker-dev-dagster-pipelines     |   File "/usr/local/lib/python3.9/site-packages/dagster/_check/__init__.py", line 633, in inst_param
docker-dev-dagster-pipelines     |     raise _param_type_mismatch_exception(
docker-dev-dagster-pipelines     | dagster._check.ParameterCheckError: Param "target" is not one of ['GraphDefinition', 'PipelineDefinition', 'UnresolvedAssetJobDefinition']. Got SourceAsset(key=AssetKey(['ml_cxr_datalake']), metadata_entries=[], io_manager_key='ml_mongo_warehouse', description='Mongo collection with ready for CXR model training', partitions_def=None, group_name='default', resource_defs={}) which is type <class 'dagster.core.asset_defs.source_asset.SourceAsset'>.
🤖 1
s

sandy

06/22/2022, 2:23 PM
Hey @George Pearse - it looks like this error is occurring at the time you're trying to instantiate a schedule or sensor. Is that something your code is doing?
g

George Pearse

06/22/2022, 2:28 PM
I just load the asset into the repo as is. Purely trying to test out wiring up some assets, tried with the name of the asset directly specified and just via the function name.
I'll try one of your simple example scripts and tweak the script 1 step at a time until I break something
s

sandy

06/22/2022, 3:05 PM
Are you using
@repository
? If you share more of your code, or how you're launching Dagit, I'd likely be able to help you fix it.
g

George Pearse

06/22/2022, 3:13 PM
@repository, just loading via docker-compose for local dev work for the minute. Version 0.15.0 will send you the script. DM'd all the relevant code
Actually still hitting problems. Copied the weather example as closely as I can, and running via dagit just on this file.
Copy code
from dagster import load_assets_from_modules, with_resources, AssetKey, SourceAsset, asset, IOManager, IOManagerDefinition, with_resources, define_asset_job, repository

import pandas as pd
from pandas import DataFrame

import os

sfo_q2_weather_sample = SourceAsset(
    key=AssetKey("sfo_q2_weather_sample"),
    description="Weather samples, taken every five minutes at SFO",
    metadata={"format": "csv"},
)

class LocalFileSystemIOManager(IOManager):
    """Translates between Pandas DataFrames and CSVs on the local filesystem."""

    def _get_fs_path(self, asset_key: AssetKey) -> str:
        rpath = os.path.join(*asset_key.path) + ".csv"
        return os.path.abspath(rpath)

    def handle_output(self, context, obj: DataFrame):
        """This saves the dataframe as a CSV."""
        fpath = self._get_fs_path(context.asset_key)
        obj.to_csv(fpath)

    def load_input(self, context):
        """This reads a dataframe from a CSV."""
        fpath = self._get_fs_path(context.asset_key)
        return pd.read_csv(fpath)


@asset
def daily_temperature_highs(sfo_q2_weather_sample: DataFrame) -> DataFrame:
    """Computes the temperature high for each day"""
    sfo_q2_weather_sample["valid_date"] = pd.to_datetime(sfo_q2_weather_sample["valid"])
    return sfo_q2_weather_sample.groupby("valid_date").max().rename(columns={"tmpf": "max_tmpf"})


@asset
def hottest_dates(daily_temperature_highs: DataFrame) -> DataFrame:
    """Computes the 10 hottest dates"""
    return daily_temperature_highs.nlargest(10, "max_tmpf")


@repository
def software_defined_assets():
    return [
        *with_resources(
        [hottest_dates, daily_temperature_highs],
        resource_defs={
            "io_manager": IOManagerDefinition.hardcoded_io_manager(LocalFileSystemIOManager())
        }),
        define_asset_job("weather_assets_job"),
]
Copy code
Using temporary directory /opt/dagster/pipelines/research_and_development/tmpbv49k8pj for storage. This will be removed when dagit exits.
To persist information across sessions, set the environment variable DAGSTER_HOME to a directory to use.

0it [00:00, ?it/s]
0it [00:00, ?it/s]
/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/core/workspace/context.py:556: UserWarning: Error loading repository location software_defined_assets.py:dagster.core.errors.DagsterInvalidDefinitionError: Input asset '["sfo_q2_weather_sample"]' for asset '["daily_temperature_highs"]' is not produced by any of the provided asset ops and is not one of the provided sources

Stack Trace:
  File "/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/grpc/server.py", line 227, in __init__
    self._container_image,
  File "/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/grpc/server.py", line 103, in __init__
    loadable_target_origin.attribute,
  File "/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/grpc/utils.py", line 33, in get_loadable_targets
    else loadable_targets_from_python_file(python_file, working_directory)
  File "/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/core/workspace/autodiscovery.py", line 26, in loadable_targets_from_python_file
    loaded_module = load_python_file(python_file, working_directory)
  File "/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/core/code_pointer.py", line 86, in load_python_file
    return import_module_from_path(module_name, python_file)
  File "/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/seven/__init__.py", line 50, in import_module_from_path
    spec.loader.exec_module(module)
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "software_defined_assets.py", line 48, in <module>
    @repository
And literally copying the example file for file, for the spark weather assets demo I get
Copy code
ld-pipelines-env/lib/python3.7/site-packages/dagster/core/workspace/context.py:556: UserWarning: Error loading repository location repo.py:dagster.core.errors.DagsterInvalidDefinitionError: Bad return value from repository construction function: all elements of list must be of type JobDefinition, GraphDefinition, PipelineDefinition, PartitionSetDefinition, ScheduleDefinition, SensorDefinition, AssetsDefinition, or SourceAsset.Got value of type <class 'list'> at index 0.

Stack Trace:
  File "/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/grpc/server.py", line 227, in __init__
    self._container_image,
  File "/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/grpc/server.py", line 103, in __init__
    loadable_target_origin.attribute,
  File "/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/grpc/utils.py", line 33, in get_loadable_targets
    else loadable_targets_from_python_file(python_file, working_directory)
  File "/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/core/workspace/autodiscovery.py", line 26, in loadable_targets_from_python_file
    loaded_module = load_python_file(python_file, working_directory)
  File "/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/core/code_pointer.py", line 86, in load_python_file
    return import_module_from_path(module_name, python_file)
  File "/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/seven/__init__.py", line 50, in import_module_from_path
    spec.loader.exec_module(module)
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "software_defined_assets/repo.py", line 6, in <module>
    @repository
  File "/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/core/definitions/decorators/repository_decorator.py", line 258, in repository
    return _Repository()(name)
  File "/opt/dagster/behold-pipelines-env/lib/python3.7/site-packages/dagster/core/definitions/decorators/repository_decorator.py", line 69, in __call__
    "Bad return value from repository construction function: all elements of list "

  location_name=location_name, error_string=error.to_string()
2022-06-22 16:21:49 +0000 - dagit - INFO - Serving dagit on <http://127.0.0.1:4000> in process 1024118
s

sandy

06/22/2022, 5:03 PM
Yikes - Iooks like there's an error there. This should fix it (add the
*
):
Copy code
@repository
def software_defined_assets():
    return [*spark_weather_assets]
❤️ 1
5 Views