Ismael Rodrigues
10/13/2022, 2:07 PMFélix Tremblay
10/13/2022, 3:08 PMJordan
10/13/2022, 3:20 PMBrian Pohl
10/13/2022, 3:57 PMops:
execute_mi_sales_zip_30_lag_est_snowpark:
config: {'inputs': {'models_ran': True}}
run_metrics_models:
config: {'inputs': {'lag_365_est': True, 'lag_90_est': True}}
CJ
10/13/2022, 4:01 PMrex
10/13/2022, 4:07 PMMatt Millican
10/13/2022, 5:40 PMExecutor
being used by its job?
Context: I am trying to construct an op with the following properties:
1. Uses a DynamicOut
2. The DynamicOut
produces exactly one output if running in a job whose Executor
is an InProcessExecutor
3. If the job’s Executor
is a MultiprocessExecutor
, the `DynamicOut`’s branching factor matches the max_concurrent
of the executor
The overall goal is to perform batch processing split amongst a number of processes that matches the MP factor of the job. We haven’t had success with allowing Dagster to control this itself using a DynamicOut
with arbitrary fanout, because the time needed to spawn one new process per item is unacceptably high even when we use `forkserver`.Ryan Navaroli
10/13/2022, 6:31 PMIssac Loo
10/13/2022, 6:58 PMPipeline Job 1
is scheduled to run @ 4am Daily (using schedule)
• Pipeline Job 1
materializes an asset (using AssetMaterialization on the last Op in the job)
◦ Asset only needs to be materialized on the first of the month, but I’d still like Pipeline Job 1
to run daily
• Pipeline Job 2
sensing when to run AFTER Pipeline Job 1
materializes an asset on the first of the month (using asset sensor)
How do I use partitions to only materializes Pipeline Job 1
’s asset on the first of the month? Can you share pseudo-code on how to achieve this?Ruoyu Qian
10/13/2022, 7:22 PMrequired_resource_keys={"hightouch"}
and a job that call the op with _resource_defs_={_"resources"_:{_"hightouch"_: hightouch_resource}}
But I got an error like this , what might be the issue, is it syntax issue?
dagster._check.CheckError: Value in Mapping mismatches expected type for key resources. Expected value of type <class 'dagster._core.definitions.resource_definition.ResourceDefinition'>. Got value {'hightouch': <dagster._core.definitions.resource_definition.ResourceDefinition object at 0x146eb8c40>} of type <class 'dict'>.
# My resources
class Hightouch:
_endpoint = "some endpoint"
def __init__(self, api_key: str):
self,
self.api_key = api_key
@property
def headers(self):
return {"Authorization": f"Bearer {self.api_key}"}
def start_sync(self, sync_id: int) -> int:
response = <http://requests.post|requests.post>(
self._endpoint + f"{sync_id}/trigger", headers=self.headers
)
return response
@resource(
config_schema={
"HIGHTOUCH_API_KEY": Field(StringSource, is_required=True),
}
)
def hightouch_resource(context) -> Hightouch:
return Hightouch(
api_key=context.resource_config["HIGHTOUCH_API_KEY"],
)
hightouch_configured = hightouch_resource.configured(
{
"HIGHTOUCH_API_KEY": os.getenv("HIGHTOUCH_API_KEY"),
}
)
# My ops
from resources.hightouch import *
@op(required_resource_keys={"hightouch"})
def sync_user(context):
"""
This sync function only syncs user on Hightouch
"""
syncs = context.resources.hightouch
syncs.start_sync()
#My jobs
@job(
description="""
This job runs the op
""",
tags={"ecs/cpu": "2048", "ecs/memory": "8192"},
resource_defs={"resources":{"hightouch": hightouch_resource}},
config=some_config,
)
def my_job():
sync_user()
Hiroki Hayama
10/13/2022, 8:23 PMMultiprocess executor: child process for step get_community_updates unexpectedly exited with code -11
dagster._core.executor.child_process_executor.ChildProcessCrashException
Stack Trace:
File "/usr/local/lib/python3.8/site-packages/dagster/_core/executor/multiprocess.py", line 214, in execute
event_or_none = next(step_iter)
, File "/usr/local/lib/python3.8/site-packages/dagster/_core/executor/multiprocess.py", line 330, in execute_step_out_of_process
for ret in execute_child_process_command(multiproc_ctx, command):
, File "/usr/local/lib/python3.8/site-packages/dagster/_core/executor/child_process_executor.py", line 163, in execute_child_process_command
raise ChildProcessCrashException(exit_code=process.exitcode)
the get_community_updates
is pulling down a few <100 rows from snowflake and trying to return a pandas dataframe with snowflakes.connector and the fetch_pandas_all() function. I’ve tried increasing the ecs/cpu tags but no luck there. Any tips/pointers would be appreciatedCraig Austin
10/13/2022, 10:03 PMop_tags
on @asset
to configure the k8s job is not working for me, but from looking at the code/docs, it seems like it should be combining anything in the pod_spec_config
key in "dagster-k8s/config"
w/ the dagster-generated job/pod config. I'm using the helm chart to install dagster (w/ run launcher K8sRunLauncher
) + the user-deployment helm chart to keep user code in a separate repo/namespace.
@asset(
op_tags={
"dagster-k8s/config": {
"pod_spec_config": {
"tolerations": [{
"key": "my-key",
"operator": "Exists",
"effect": "NoSchedule",
}],
},
},
},
)
def my_asset(_):
pass
Zach P
10/13/2022, 10:29 PMrequired_resource_keys={…}
syntax, here I’ve used generic names.
I’d like to be able to override these in certain cases: eg if I have a single job that may need multiple of that type. Is there some way to provide a mapping in these cases?
EG:
@io_manager(required_resource={"db_creds"})
def my_io_manager(context): ...
resource_defs_for_job = {"my_first_db_creds":db_creds(...), "my_2nd_db_creds":db_creds(...), "my_first_io_manager":my_io_manager, "my_2nd_io_manager":my_io_manager}
How can I force my_2nd_io_manager
to use the required resource key my_2nd_db_creds
instead of db_creds
?Adam Bloom
10/13/2022, 10:40 PMmulti_asset_sensor
work with assets across multiple repositories? The docs for regular asset sensors suggest that is allowed, so I was hoping it'd work with multi_asset_sensors as well. However, I'm getting a grpc error from dagster-daemon: details = "Exception iterating responses: No asset with AssetKey(...) found in repository"
. The asset key in question is defined by another repository.Abhijeet Singh
10/13/2022, 11:44 PMdrogozin
10/14/2022, 10:20 AMPrince Richard Augustin
10/14/2022, 11:31 AMArchie Kennedy
10/14/2022, 1:00 PM@asset(config_schema={"thing" = Field(str)})
def my_asset(context):
thing = context.op_config["thing"]
...
How can I attach the contents of "thing" to the asset as metadata?Zachary Bluhm
10/14/2022, 2:21 PMChris Histe
10/14/2022, 2:35 PMMartin Laurent
10/14/2022, 3:26 PMop
in the same graph
, I see them "restarting" by themselves (see logs) about 4 hours after. I also see them duplicated on the graph view, which was not the case before and is, in no way, part of the graph logic. Is there some sort of hidden retry logic that may trigger this?
This is a problem because the logic in the ops
is not meant to be run twiceCJ
10/14/2022, 6:14 PMCJ
10/14/2022, 6:14 PMCJ
10/14/2022, 6:16 PMSlackbot
10/14/2022, 8:04 PMsaravan kumar
10/14/2022, 8:04 PMsaravan kumar
10/14/2022, 8:05 PMRuoyu Qian
10/14/2022, 9:23 PMparents_asset
is listed with asset_key
array. I want to pass the key to downstream’s test() function. but parents_asset.asset_key[-1]
does not seem like the right syntax
@asset(
required_resource_keys={"some_check"},
key_prefix=DEFAULT_KEY_PREFIX,
ins={
"parents_asset": AssetIn(
asset_key=[
"snowflake",
DBT_DATABASE.lower(),
"my_schema_name",
"my_table_name",
],
input_manager_key='snowpark_dataframe'
)
},
)
def my_test(context, parents_asset):
context.resources.some_check.test(parents_asset.asset_key[-1])
Jordan
10/14/2022, 10:30 PMGeoffrey Greenleaf
10/14/2022, 11:51 PM