Hi, We are creating a job based on software define...
# ask-community
m
Hi, We are creating a job based on software defined assets per client and each of these jobs has its own configuration which is defined as a resource and also their io_manager is custom per client having different credentials. Redefining the same resource per client and thus job did not work complaining about that a resource name must refer to the same resource definition, so I assume that the resource is unique per code location. Having unique resource names per job, e.g.,
configuration_client_1
, and
io_manager_client_1
solved this problem, but in Dagit now we see all the resources defined for all clients when going into the details page of a job of a single client. In other words, it seems that Dagit is showing always all resources in the code location and those which are used by a job. Is this intended or are we using something wrong? Would you recommend a better approach for this? P.S. I have already tried to use configured resources but then the configuration is visible and editable in Dagit which is not a desired behaviour for us, and using the "Configured API" will lead to the same problem described above. Thank you!
t
Hmm, yeah, you should only see the resources that are used by the assets in the job. Some debugging questions: 1. How are you generating the jobs? 2. How are you defining which asset belongs to a client?
m
Thank you @Tim Castillo for your reply. Here is a self-contained simplified version of our code and in the attached screenshot you can see how the resources look like in Dagit:
Copy code
import datetime
import re
from dataclasses import dataclass
from typing import (
    Any,
    List,
    Optional,
    Dict,
    Sequence
)

import pandas as pd
from dagster import (
    asset,
    AssetIn,
    AssetsDefinition,
    ScheduleDefinition,
    SensorDefinition,
    with_resources,
    io_manager,
    build_schedule_from_partitioned_job,
    define_asset_job,
    AssetSelection,
    AssetKey,
    TimeWindowPartitionsDefinition,
    DefaultScheduleStatus,
    UPathIOManager,
    OpExecutionContext,
    resource,
    InitResourceContext,
    RepositoryData,
    repository
)
from dagster._core.storage.fs_io_manager import PickledObjectFilesystemIOManager
from pandas import DataFrame

DEFAULT_DATE_FORMAT = "%Y-%m-%d"

DAGSTER_REPOSITORY_ID = "dp_v1"
PIPELINE_CONTEXT_RESOURCE_KEY = 'execution_context'
IO_MANAGER_RESOURCE_KEY = 'io_manager'


@dataclass
class Configuration:
    client_id: str
    account_id: str
    secret_name: str
    running_schedule: str
    further_config: Any


class ConfigurationService:

    def load_all_configurations(self) -> List[Configuration]:
        # hits an endpoint to load the client configs
        return [
            Configuration("client-1", "account-1", "service-account-1", "0 0 * * *", "further-config-1"),
            Configuration("client-2", "account-2", "service-account-2", "0 0 * * *", "further-config-2"),
            Configuration("client-3", "account-3", "service-account-3", "0 0 * * *", "further-config-3"),
        ]


class DPService:
    def read_data_a(
            self,
            further_config: Any
    ) -> DataFrame:
        data = pd.DataFrame({
            "a": [1, 2, 3],
            "b": [11, 22, 33]
        })
        return data

    def get_data_b(
            self,
            data_a: DataFrame
    ) -> DataFrame:
        data = data_a[data_a['a'] <= 2]
        return data


@dataclass
class PipelineContext:
    client_id: str
    account_id: str
    secret_name: str


@dataclass
class RepositoryData:
    assets: List[AssetsDefinition]
    jobs: List[Any]  # The actual type is `UnresolvedAssetJobDefinition` which is still a private data type in Dagster
    schedules: Optional[List[ScheduleDefinition]] = None
    sensors: Optional[List[SensorDefinition]] = None


class DPPipeline:

    def __init__(
            self,
            configuration_service: ConfigurationService,
            dp_service: DPService,
            repository_id: str,
    ):
        self.configuration_service = configuration_service
        self.dp_service = dp_service
        self.repository_id = repository_id

    @staticmethod
    def normalize_name(text: str) -> str:
        """
        Normalizes an artifact name to conformant with Dagster naming conventions of artifacts: `^[A-Za-z0-9_]+$`
        """
        return re.sub("[^A-Za-z0-9_]", '_', text)

    @staticmethod
    def config_id_affix(
            config: Configuration
    ) -> List[str]:
        prefix = [
            config.client_id,
            config.account_id
        ]
        return prefix

    @staticmethod
    def assets_prefix(
            repository_id: str,
            config: Configuration
    ) -> List[str]:
        prefix = [
            repository_id
        ]
        prefix.extend(DPPipeline.config_id_affix(config))
        return prefix

    @staticmethod
    def assets_group(
            config: Configuration
    ) -> str:
        group = DPPipeline.normalize_name(f"{config.client_id}")
        return group

    @staticmethod
    def assets_metadata(
            config: Configuration
    ) -> Dict[str, Any]:
        metadata = {
            "client": config.client_id,
            "account": config.account_id
        }
        return metadata

    def get_repository_data_for_config(
            self,
            config: Configuration
    ) -> RepositoryData:
        config_id_affix = AssetKey(
            DPPipeline.config_id_affix(config)
        ).to_python_identifier()

        assets_prefix = DPPipeline.assets_prefix(self.repository_id, config)
        assets_group = DPPipeline.assets_group(config)
        assets_metadata = DPPipeline.assets_metadata(config)
        assets_partitions = TimeWindowPartitionsDefinition(
            start=datetime.datetime(2023, 1, 1),
            fmt=DEFAULT_DATE_FORMAT,
            cron_schedule=config.running_schedule
        )

        pipeline_context_resource_key = f"{PIPELINE_CONTEXT_RESOURCE_KEY}__{config_id_affix}"
        io_manager_resource_key = f"{IO_MANAGER_RESOURCE_KEY}__{config_id_affix}"

        @io_manager()
        def scoring_io_manager() -> UPathIOManager:
            """
            This io_manager may later be tailored and configured per client/config.
            E.g., using the configured service account to access GCS.
            """
            # This io-manager is used for demonstration purposes
            # as our actual io-manager is more advanced and has a dependecy on Spark.
            return PickledObjectFilesystemIOManager("dagster-data")

        @resource()
        def pipeline_context(_: InitResourceContext) -> PipelineContext:
            pipeline_context = PipelineContext(
                client_id=config.client_id,
                account_id=config.account_id,
                secret_name=config.secret_name
            )
            return pipeline_context

        @asset(
            name="data_a",
            key_prefix=assets_prefix,
            group_name=assets_group,
            metadata=assets_metadata,
            compute_kind="data_read",
            io_manager_key=io_manager_resource_key,
            required_resource_keys={pipeline_context_resource_key},
            partitions_def=assets_partitions,
            config_schema={
                'further_config': str
            }
        )
        def data_a(
                context: OpExecutionContext,
        ) -> DataFrame:
            pipeline_context = getattr(context.resources, pipeline_context_resource_key)
            # Do something with the context

            further_config = context.op_config["further_config"]

            data = self.dp_service.read_data_a(further_config)
            return data

        @asset(
            name="data_b",
            key_prefix=assets_prefix,
            group_name=assets_group,
            metadata=assets_metadata,
            compute_kind="process",
            io_manager_key=io_manager_resource_key,
            required_resource_keys={pipeline_context_resource_key},
            partitions_def=assets_partitions,
            config_schema={},
            ins={
                "data_a": AssetIn(key_prefix=assets_prefix)
            }
        )
        def data_b(
                context: OpExecutionContext,
                data_a: DataFrame
        ) -> DataFrame:
            pipeline_context = getattr(context.resources, pipeline_context_resource_key)
            # Do something with the context

            data = self.dp_service.get_data_b(data_a)
            return data

        assets = [
            data_a,
            data_b
        ]

        resources = {
            io_manager_resource_key: scoring_io_manager,
            pipeline_context_resource_key: pipeline_context
        }

        assets = with_resources(
            assets,
            resource_defs=resources
        )

        assets_run_config = {
            data_a.key.to_python_identifier(): {
                "config": {
                    "further_config": config.further_config
                }
            }
        }

        # Jobs

        dp_job_key = AssetKey(
            [*(assets_prefix or []), "dp_job_1"]
        ).to_python_identifier()
        dp_job = define_asset_job(
            name=dp_job_key,
            selection=AssetSelection.assets(*assets),
            partitions_def=assets_partitions,
            tags={
                **assets_metadata,  # Metadata for asset jobs is still not supported
                "dagster-k8s/config": {
                    "container_config": {
                        "resources": {
                            "limits": {
                                "cpu": "2000m",
                                "memory": "4Gi"
                            }
                        }
                    }
                }
            },
            config={
                "ops": assets_run_config
            }
        )

        jobs = [
            dp_job
        ]

        # Schedules

        dp_schedule = build_schedule_from_partitioned_job(
            job=dp_job,
            default_status=DefaultScheduleStatus.RUNNING
        )

        schedules = [
            dp_schedule
        ]

        sensors = []

        repo_data = RepositoryData(
            assets=assets,
            jobs=jobs,
            schedules=schedules,
            sensors=sensors
        )

        return repo_data

    def get_repository_data(
            self,
    ) -> List[Any]:
        configs = self.configuration_service.load_all_configurations()
        assets = []
        jobs = []
        schedules = []
        sensors = []
        for config in configs:
            data_for_config = self.get_repository_data_for_config(config)
            assets.extend(data_for_config.assets)
            jobs.extend(<http://data_for_config.jobs|data_for_config.jobs>)
            if data_for_config.schedules is not None:
                schedules.extend(data_for_config.schedules)
            if data_for_config.sensors is not None:
                sensors.extend(data_for_config.sensors)

        repository_data = [
            *assets,
            *jobs,
            *schedules,
            *sensors
        ]

        return repository_data


# @inject
def dagster_repository_data(
        # scoring_pipeline: ScoringPipeline = Provide[Application.scoring_pipeline]
) -> Sequence[Any]:
    dp_pipeline = DPPipeline(
        configuration_service=ConfigurationService(),
        dp_service=DPService(),
        repository_id=DAGSTER_REPOSITORY_ID
    )
    repo_artifacts = dp_pipeline.get_repository_data()
    return repo_artifacts


@repository(name=DAGSTER_REPOSITORY_ID)
def dagster():
    return dagster_repository_data()
Kindly notice that: • Our io-manager is bound to a client and thus it needs to be also configured based on the client configuration. Simply, providing a service account to be used to write to the client's storage. That is why it cannot be the same resource for all clients. In this sample I have used any arbitrary io-manager to demonstrate with a running code.