Hemanth Bellala
02/24/2022, 2:55 PMdagster-daemon
I am a bit confused on what include in that file. Before i had everything in the dagster.yaml file. If there is an example any where i could take a look at that would very helpfulAlex Service
02/24/2022, 3:07 PMlocalhost:3000
gives an error like the following, but 0.0.0.0:3000
works fine?David Choy
02/24/2022, 9:07 PMDavid Choy
02/24/2022, 9:07 PMDavid Choy
02/24/2022, 9:28 PMIrven Aelbrecht
02/24/2022, 11:17 PMop A
) which does an api call that creates an item. (response is urn)
Then I have a sensor which receives events on the progress of calculating the item (which happens outside our control)
This sensor can receive events of multiple items.
Then I want to continue the job which used op A
because there is specific data in the job later on.
I want to create something like a second op which can wait/ do something on an event by the sensor when the urn matches.
I don't want to use polling as this would cause extra delays or resources when you make the polling delay low.Fabian Post
02/25/2022, 9:11 AMIrven Aelbrecht
02/25/2022, 10:47 AMdagster-test
which is not published to PyPi so I cannot use this ...
Probably there are better ways to launch a docker image in a test.
Related to this, my use case is:
Our code is already separated in different docker images. How should I best call these docker images? Should I use the docker executor or directly work with the docker client (https://pypi.org/project/docker/)?Frank Dekervel
02/25/2022, 3:27 PMAustin Bailey
02/25/2022, 4:16 PMLouis Auneau
02/25/2022, 6:07 PMcollect()
call) of the same type in a job, or is it mandatory to send them into an Op to do so ?Heather Scott
02/25/2022, 8:01 PM@op(config_schema={"slack": str}, required_resource_keys=set(["slack"]))
(I'm not even sure if that is right at all)
but I don't know how to actually 'provide' the key. I thought I could add the slack Oauth token in config, but that doesn't seem to be the case...Alex Service
02/25/2022, 8:33 PMDavid Farnan-Williams
02/25/2022, 9:41 PMRomain
02/25/2022, 10:14 PMdagster.PartitionedConfig
. Did I miss something?Daniel Cho
02/27/2022, 2:15 AMVlad Efanov
02/27/2022, 8:16 AMBoaz Menuhin
02/27/2022, 10:39 AMMy Mai
02/27/2022, 9:07 PM@schedule(
cron_schedule="45 6 * * *",
job=hello_cereal_job,
execution_timezone="US/Central",
)
def good_morning_schedule(context):
date = context.scheduled_execution_time.strftime("%Y-%m-%d")
return {"ops": {"hello_cereal": {"config": {"date": date}}}}
specifically the last line here -- is this returning a schedule definition for an op in the hello_cereal_job? Would this syntax change if are were more than one op in this job?
Thanks!Valentin Stauber
02/28/2022, 9:39 AMMartin Laurent
02/28/2022, 10:24 AMmrdavidlaing
02/28/2022, 1:57 PMexecution:
multiprocess:
config:
max_concurrent: 0
Is this a regression? Or intentional change of behaviour?chrispc
02/28/2022, 2:02 PMDylan Hunt
02/28/2022, 3:47 PMconstruct_pipeline_with_yaml
it is throwing below error.
File "C:\test\parallel-dag\venv\lib\site-packages\dagster\core\execution\api.py", line 414, in _logged_execute_pipeline
) = _check_execute_pipeline_args(
File "C:\test\parallel-dag\venv\lib\site-packages\dagster\core\execution\api.py", line 910, in _check_execute_pipeline_args
pipeline_def = pipeline.get_definition()
File "C:\test\parallel-dag\venv\lib\site-packages\dagster\utils\__init__.py", line 222, in __hash__
return hash(tuple(self))
TypeError: unhashable type: 'dict'
I upgraded from python 3.7.9 to 3.10 as well.
Instead of reading from the yaml file I'm passing a Dict directly and everything is same as Dagster documentation for pipeline dsl.Scott Hood
02/28/2022, 9:10 PMErik
03/01/2022, 4:49 AM@op(out=DynamicOut(int), config_schema={"start": int, "stop": int)
def generate_things(context) -> int:
for i in range(context.op_config["start"], context.op_config["stop"]):
yield DynamicOutput(value=i, mapping_key=f"{i}")
@graph
def graph_of_complex_things(i):
... # do graph of things with i
@job
def dynamic_size_ordered_job():
for i in generate_things():
graph_of_complex_things(i)
I was poking around and generate_things().map(graph_of_complex_things)
is the closest thing I can find but isn't orderedEunice Tan
03/01/2022, 4:49 AMdagster job execute …
. However, the only logs I see in the kubernetes pod is
2022-03-01 03:07:12 +0000 - dagster - DEBUG - run_microdump - 900c13db-7ef0-4f9a-aa86-ca258e2e2ea9 - 7 - RUN_START - Started execution of run for "run_microdump".
2022-03-01 03:07:12 +0000 - dagster - DEBUG - run_microdump - 900c13db-7ef0-4f9a-aa86-ca258e2e2ea9 - 7 - ENGINE_EVENT - Executing steps using multiprocess executor: parent process (pid: 7)
2022-03-01 03:07:12 +0000 - dagster - DEBUG - run_microdump - 900c13db-7ef0-4f9a-aa86-ca258e2e2ea9 - 7 - get_full_copy_tables - ENGINE_EVENT - Launching subprocess for get_full_copy_tables
2022-03-01 03:07:13 +0000 - dagster - DEBUG - run_microdump - 900c13db-7ef0-4f9a-aa86-ca258e2e2ea9 - 10 - get_full_copy_tables - ENGINE_EVENT - Starting initialization of resources [devdb_tables_fetcher, io_manager].
2022-03-01 03:07:13 +0000 - dagster - DEBUG - run_microdump - 900c13db-7ef0-4f9a-aa86-ca258e2e2ea9 - 10 - get_full_copy_tables - ENGINE_EVENT - Finished initialization of resources [devdb_tables_fetcher, io_manager].
2022-03-01 03:07:13 +0000 - dagster - DEBUG - run_microdump - 900c13db-7ef0-4f9a-aa86-ca258e2e2ea9 - 7 - ENGINE_EVENT - Multiprocess executor: parent process exiting after 1.16s (pid: 7)
2022-03-01 03:07:13 +0000 - dagster - DEBUG - run_microdump - 900c13db-7ef0-4f9a-aa86-ca258e2e2ea9 - 7 - RUN_SUCCESS - Finished execution of run for "run_microdump".
and I don’t see logs from my python code itself.
I’ve tried logging with both <http://context.log.info|context.log.info>
and get_dagster_logger
and both don’t seem to show up. I was wondering if there are any other configurations I should be checking in this case?Aharon Levine
03/01/2022, 10:57 AMupstream request timeout
For example the /runs /status amd /workspace pages worked every now and then after refreshing the browser multiple times but in general they just show a timeout message like upstream connect error or disconnect/reset before headers. reset reason: connection failure
.
Don’t see any strange error logs in kube for either our dagit or our daemon pods.
The issue seems intermittent , for a few minutes everything works as normal and then the timeouts return.
Any ideas how I can fix this?geoHeil
03/01/2022, 1:43 PMrun_coordinator:
module: dagster.core.run_coordinator
class: QueuedRunCoordinator
config:
max_concurrent_runs: 25
tag_concurrency_limits:
- key: "database"
value: "redshift"
limit: 4
- key: "dagster/backfill"
limit: 10
but this is a global configuration.
Assuming I have different teams with separate pipelines and in particular repositories / workspaces teams might contribute unique new resources in their use cases.
For example one team might have a https://github.com/dagster-io/dagster/blob/master/examples/hacker_news_assets/hacker_news_assets/resources/hn_resource.py HN resource and another team a resource for other APIs.
How can I merge the configuration from the resources of the different teams to configure the run coordinator? (i.e. if some resources are available only in specific teams) Is it possible to somehow import the resources of teams (and not hardcode it at the startup of dagster in the centralized yml configuration)?David Choy
03/01/2022, 2:14 PM