Jordan
01/23/2023, 10:34 PMyuhan
01/23/2023, 10:36 PMJordan
01/25/2023, 4:58 PMdef build_assets(name):
@asset(name=f"asset_1__job_{name}")
def asset_1():
return 1
@asset(
name=f"asset_2__job_{name}", ins={"asset_1": AssetIn(f"asset_1__job_{name}")}
)
def asset_2(asset_1):
return asset_1 + 1
@asset(
name=f"asset_3__job_{name}", ins={"asset_2": AssetIn(f"asset_2__job_{name}")}
)
def asset_3(asset_2):
return asset_2 + 1
...
return [asset_1, asset_2, asset_3, ...]
def build_repo(repo_name):
all_assets = []
jobs = []
schedules = []
for job_name in range(1, 50):
name = f"{repo_name}_{job_name}"
assets = build_assets(name)
job = define_asset_job(name=name, selection=assets)
jobs.append(job)
schedules.append(
ScheduleDefinition(
name=f"schedule_{job_name}",
job=job,
cron_schedule="0 0 * * *",
)
)
all_assets.append(assets)
return [*jobs, *all_assets, *schedules]
@repository()
def A():
return build_repo("A")
...
@repository()
def Z():
return build_repo("Z")
Jordan
01/25/2023, 5:46 PMowen
01/26/2023, 11:00 PMJordan
01/27/2023, 3:17 PMJordan
01/27/2023, 3:21 PMJordan
01/27/2023, 3:22 PMdef build_assets(repo_name, job_name, API, cred):
@asset(name=f"raw__{job_name}")
def raw(context):
data = API(cred).fetch_data(context.output_asset_partition_key())
return data
@asset(
name=f"transform__{job_name}", ins={"raw": AssetIn(f"raw__{job_name}")}
)
def transform(raw):
import importlib
transform = getattr(
importlib.import_module(repo_name),
"specific_transform",
)(raw)
return transform
@asset(
name=f"transactions__{job_name}", ins={"transform": AssetIn(f"transform__{job_name}")}
)
def transactions(transform):
data_transactions = get_transactions(transform)
return data_transactions
...
return [raw, transform, transactions ...]
def build_repo(repo_name, API):
all_assets = []
jobs = []
schedules = []
credentials = API.get_credentials()
for cred in credentials:
job_name = f"{repo_name}_{cred['database']}"
assets = build_assets(repo_name, job_name, API, cred)
job = define_asset_job(name=job_name, selection=assets)
jobs.append(job)
all_assets.append(assets)
return [*jobs, *all_assets]
@repository()
def provider_A():
return build_repo(repo_name ='provider_A', API = API_A)
...
Jordan
01/27/2023, 3:23 PMJordan
01/27/2023, 8:45 PMuser_code:
build:
context: .
dockerfile: ./Dockerfile
target: user_code
image: "$REGISTRY_URL/$ECR_USERCODE_REPO"
container_name: user_code
command: "dagster api grpc -h 0.0.0.0 -p 4000 -f repository.py"
environment:
...
deploy:
resources:
limits:
cpus: '1'
memory: 2048M
workspace.yaml
load_from:
- grpc_server:
host: user_code
port: 4000
location_name: "dags"
owen
01/31/2023, 7:12 PMtime sudo py-spy record -o profile.svg -- python repository.py
would give you a pretty good idea). It's likely that there's some inefficiencies somewhere on our side that show up at this scale, and it's also possible that there could be some easy wins on your side (maybe interfacing with the API takes an unexpectedly long amount of time, or something like that)Jordan
02/01/2023, 8:36 PMowen
02/02/2023, 6:39 PMJordan
02/08/2023, 4:39 PM