Mhd Mousa Hamad
04/11/2023, 11:51 AMconfiguration_client_1
, and io_manager_client_1
solved this problem, but in Dagit now we see all the resources defined for all clients when going into the details page of a job of a single client. In other words, it seems that Dagit is showing always all resources in the code location and those which are used by a job. Is this intended or are we using something wrong? Would you recommend a better approach for this?
P.S. I have already tried to use configured resources but then the configuration is visible and editable in Dagit which is not a desired behaviour for us, and using the "Configured API" will lead to the same problem described above.
Thank you!Tim Castillo
04/11/2023, 2:57 PMMhd Mousa Hamad
04/13/2023, 1:18 PMimport datetime
import re
from dataclasses import dataclass
from typing import (
Any,
List,
Optional,
Dict,
Sequence
)
import pandas as pd
from dagster import (
asset,
AssetIn,
AssetsDefinition,
ScheduleDefinition,
SensorDefinition,
with_resources,
io_manager,
build_schedule_from_partitioned_job,
define_asset_job,
AssetSelection,
AssetKey,
TimeWindowPartitionsDefinition,
DefaultScheduleStatus,
UPathIOManager,
OpExecutionContext,
resource,
InitResourceContext,
RepositoryData,
repository
)
from dagster._core.storage.fs_io_manager import PickledObjectFilesystemIOManager
from pandas import DataFrame
DEFAULT_DATE_FORMAT = "%Y-%m-%d"
DAGSTER_REPOSITORY_ID = "dp_v1"
PIPELINE_CONTEXT_RESOURCE_KEY = 'execution_context'
IO_MANAGER_RESOURCE_KEY = 'io_manager'
@dataclass
class Configuration:
client_id: str
account_id: str
secret_name: str
running_schedule: str
further_config: Any
class ConfigurationService:
def load_all_configurations(self) -> List[Configuration]:
# hits an endpoint to load the client configs
return [
Configuration("client-1", "account-1", "service-account-1", "0 0 * * *", "further-config-1"),
Configuration("client-2", "account-2", "service-account-2", "0 0 * * *", "further-config-2"),
Configuration("client-3", "account-3", "service-account-3", "0 0 * * *", "further-config-3"),
]
class DPService:
def read_data_a(
self,
further_config: Any
) -> DataFrame:
data = pd.DataFrame({
"a": [1, 2, 3],
"b": [11, 22, 33]
})
return data
def get_data_b(
self,
data_a: DataFrame
) -> DataFrame:
data = data_a[data_a['a'] <= 2]
return data
@dataclass
class PipelineContext:
client_id: str
account_id: str
secret_name: str
@dataclass
class RepositoryData:
assets: List[AssetsDefinition]
jobs: List[Any] # The actual type is `UnresolvedAssetJobDefinition` which is still a private data type in Dagster
schedules: Optional[List[ScheduleDefinition]] = None
sensors: Optional[List[SensorDefinition]] = None
class DPPipeline:
def __init__(
self,
configuration_service: ConfigurationService,
dp_service: DPService,
repository_id: str,
):
self.configuration_service = configuration_service
self.dp_service = dp_service
self.repository_id = repository_id
@staticmethod
def normalize_name(text: str) -> str:
"""
Normalizes an artifact name to conformant with Dagster naming conventions of artifacts: `^[A-Za-z0-9_]+$`
"""
return re.sub("[^A-Za-z0-9_]", '_', text)
@staticmethod
def config_id_affix(
config: Configuration
) -> List[str]:
prefix = [
config.client_id,
config.account_id
]
return prefix
@staticmethod
def assets_prefix(
repository_id: str,
config: Configuration
) -> List[str]:
prefix = [
repository_id
]
prefix.extend(DPPipeline.config_id_affix(config))
return prefix
@staticmethod
def assets_group(
config: Configuration
) -> str:
group = DPPipeline.normalize_name(f"{config.client_id}")
return group
@staticmethod
def assets_metadata(
config: Configuration
) -> Dict[str, Any]:
metadata = {
"client": config.client_id,
"account": config.account_id
}
return metadata
def get_repository_data_for_config(
self,
config: Configuration
) -> RepositoryData:
config_id_affix = AssetKey(
DPPipeline.config_id_affix(config)
).to_python_identifier()
assets_prefix = DPPipeline.assets_prefix(self.repository_id, config)
assets_group = DPPipeline.assets_group(config)
assets_metadata = DPPipeline.assets_metadata(config)
assets_partitions = TimeWindowPartitionsDefinition(
start=datetime.datetime(2023, 1, 1),
fmt=DEFAULT_DATE_FORMAT,
cron_schedule=config.running_schedule
)
pipeline_context_resource_key = f"{PIPELINE_CONTEXT_RESOURCE_KEY}__{config_id_affix}"
io_manager_resource_key = f"{IO_MANAGER_RESOURCE_KEY}__{config_id_affix}"
@io_manager()
def scoring_io_manager() -> UPathIOManager:
"""
This io_manager may later be tailored and configured per client/config.
E.g., using the configured service account to access GCS.
"""
# This io-manager is used for demonstration purposes
# as our actual io-manager is more advanced and has a dependecy on Spark.
return PickledObjectFilesystemIOManager("dagster-data")
@resource()
def pipeline_context(_: InitResourceContext) -> PipelineContext:
pipeline_context = PipelineContext(
client_id=config.client_id,
account_id=config.account_id,
secret_name=config.secret_name
)
return pipeline_context
@asset(
name="data_a",
key_prefix=assets_prefix,
group_name=assets_group,
metadata=assets_metadata,
compute_kind="data_read",
io_manager_key=io_manager_resource_key,
required_resource_keys={pipeline_context_resource_key},
partitions_def=assets_partitions,
config_schema={
'further_config': str
}
)
def data_a(
context: OpExecutionContext,
) -> DataFrame:
pipeline_context = getattr(context.resources, pipeline_context_resource_key)
# Do something with the context
further_config = context.op_config["further_config"]
data = self.dp_service.read_data_a(further_config)
return data
@asset(
name="data_b",
key_prefix=assets_prefix,
group_name=assets_group,
metadata=assets_metadata,
compute_kind="process",
io_manager_key=io_manager_resource_key,
required_resource_keys={pipeline_context_resource_key},
partitions_def=assets_partitions,
config_schema={},
ins={
"data_a": AssetIn(key_prefix=assets_prefix)
}
)
def data_b(
context: OpExecutionContext,
data_a: DataFrame
) -> DataFrame:
pipeline_context = getattr(context.resources, pipeline_context_resource_key)
# Do something with the context
data = self.dp_service.get_data_b(data_a)
return data
assets = [
data_a,
data_b
]
resources = {
io_manager_resource_key: scoring_io_manager,
pipeline_context_resource_key: pipeline_context
}
assets = with_resources(
assets,
resource_defs=resources
)
assets_run_config = {
data_a.key.to_python_identifier(): {
"config": {
"further_config": config.further_config
}
}
}
# Jobs
dp_job_key = AssetKey(
[*(assets_prefix or []), "dp_job_1"]
).to_python_identifier()
dp_job = define_asset_job(
name=dp_job_key,
selection=AssetSelection.assets(*assets),
partitions_def=assets_partitions,
tags={
**assets_metadata, # Metadata for asset jobs is still not supported
"dagster-k8s/config": {
"container_config": {
"resources": {
"limits": {
"cpu": "2000m",
"memory": "4Gi"
}
}
}
}
},
config={
"ops": assets_run_config
}
)
jobs = [
dp_job
]
# Schedules
dp_schedule = build_schedule_from_partitioned_job(
job=dp_job,
default_status=DefaultScheduleStatus.RUNNING
)
schedules = [
dp_schedule
]
sensors = []
repo_data = RepositoryData(
assets=assets,
jobs=jobs,
schedules=schedules,
sensors=sensors
)
return repo_data
def get_repository_data(
self,
) -> List[Any]:
configs = self.configuration_service.load_all_configurations()
assets = []
jobs = []
schedules = []
sensors = []
for config in configs:
data_for_config = self.get_repository_data_for_config(config)
assets.extend(data_for_config.assets)
jobs.extend(<http://data_for_config.jobs|data_for_config.jobs>)
if data_for_config.schedules is not None:
schedules.extend(data_for_config.schedules)
if data_for_config.sensors is not None:
sensors.extend(data_for_config.sensors)
repository_data = [
*assets,
*jobs,
*schedules,
*sensors
]
return repository_data
# @inject
def dagster_repository_data(
# scoring_pipeline: ScoringPipeline = Provide[Application.scoring_pipeline]
) -> Sequence[Any]:
dp_pipeline = DPPipeline(
configuration_service=ConfigurationService(),
dp_service=DPService(),
repository_id=DAGSTER_REPOSITORY_ID
)
repo_artifacts = dp_pipeline.get_repository_data()
return repo_artifacts
@repository(name=DAGSTER_REPOSITORY_ID)
def dagster():
return dagster_repository_data()
Kindly notice that:
• Our io-manager is bound to a client and thus it needs to be also configured based on the client configuration. Simply, providing a service account to be used to write to the client's storage. That is why it cannot be the same resource for all clients. In this sample I have used any arbitrary io-manager to demonstrate with a running code.