Dimka Filippov
12/02/2021, 2:29 PMArtem S
12/02/2021, 4:57 PM@d.op(out=d.DynamicOut())
def produce_dynamic_range():
start, stop, offset = 0, 86399, 1800
while start < stop:
start_offset = start + offset - 1
yield d.DynamicOutput(
{"start": start, "stop": start_offset},
f"{start}_{start_offset}"
)
start = min(start + offset, stop)
@d.op()
def extract(data):
# just pass scalar
time.sleep(5)
return 1
@d.op()
def load(data):
# just bypass
time.sleep(10)
return data
@d.graph()
def composite(data):
return load(extract(data))
@d.job(
resource_defs={**default_resources},
tags={**default_k8s_tags},
config={**multiprocess_10}
)
def solve():
result = produce_dynamic_range().map(composite).collect()
Somewhere in the middle of execution:
1000@dagster-run-b7381b03-8f34-4ecc-bf2d-95d3e3ebd385-nsrmq:/opt/dagster/app$ ps aux | grep python
1000 1 1.6 2.0 164300 126760 ? Ss 16:41 0:04 /opt/venv/bin/python -m dagster api execute_run {“class”: “ExecuteRunArgs”, “instance_ref”: null, “pipeline_origin”: {“class”: “PipelinePythonOrigin”, “pipeline_name”: “solve”, “repository_origin”: {“class”: “RepositoryPythonOrigin”, “code_pointer”: {“class”: “FileCodePointer”, “fn_name”: “main_test_repository”, “python_file”: “repository.py”, “working_directory”: “/opt/dagster/app”}, “container_image”: null, “executable_path”: “/opt/venv/bin/python”}}, “pipeline_run_id”: “b7381b03-8f34-4ecc-bf2d-95d3e3ebd385”}
1000 8 0.0 0.2 16792 13024 ? S 16:41 0:00 /opt/venv/bin/python -c from multiprocessing.semaphore_tracker import main;main(3)
1000 13 0.0 0.0 0 0 ? Z 16:41 0:00 [python] <defunct>
1000 137 0.0 0.0 0 0 ? Z 16:42 0:00 [python] <defunct>
1000 140 0.0 0.0 0 0 ? Z 16:42 0:00 [python] <defunct>
1000 142 0.0 0.0 0 0 ? Z 16:42 0:00 [python] <defunct>
1000 162 0.0 0.0 0 0 ? Z 16:42 0:00 [python] <defunct>
1000 167 0.1 0.0 0 0 ? Z 16:42 0:00 [python] <defunct>
1000 170 0.0 0.0 0 0 ? Z 16:42 0:00 [python] <defunct>
1000 186 0.1 0.0 0 0 ? Z 16:43 0:00 [python] <defunct>
1000 193 0.1 0.0 0 0 ? Z 16:43 0:00 [python] <defunct>
1000 195 0.1 0.0 0 0 ? Z 16:43 0:00 [python] <defunct>
1000 205 0.1 0.0 0 0 ? Z 16:43 0:00 [python] <defunct>
1000 219 0.1 0.0 0 0 ? Z 16:44 0:00 [python] <defunct>
1000 221 0.1 0.0 0 0 ? Z 16:44 0:00 [python] <defunct>
1000 229 0.2 0.0 0 0 ? Z 16:44 0:00 [python] <defunct>
1000 237 0.2 0.0 0 0 ? Z 16:44 0:00 [python] <defunct>
1000 245 0.2 0.0 0 0 ? Z 16:44 0:00 [python] <defunct>
1000 253 0.2 0.0 0 0 ? Z 16:44 0:00 [python] <defunct>
1000 267 0.3 0.0 0 0 ? Z 16:45 0:00 [python] <defunct>
1000 269 0.2 0.0 0 0 ? Z 16:45 0:00 [python] <defunct>
1000 277 0.4 0.0 0 0 ? Z 16:45 0:00 [python] <defunct>
1000 285 0.6 0.0 0 0 ? Z 16:45 0:00 [python] <defunct>
1000 289 10.3 2.0 311488 126428 ? Sl 16:45 0:02 /opt/venv/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=4, pipe_handle=14) --multiprocessing-fork
1000 293 0.6 0.2 16896 13048 ? S 16:45 0:00 /opt/venv/bin/python -c from multiprocessing.semaphore_tracker import main;main(5)
1000 297 21.4 1.9 306072 121140 ? Sl 16:46 0:01 /opt/venv/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=4, pipe_handle=14) --multiprocessing-fork
1000 301 3.4 0.2 16968 13192 ? S 16:46 0:00 /opt/venv/bin/python -c from multiprocessing.semaphore_tracker import main;main(5)
1000 305 0.0 0.0 10380 5852 ? R 16:46 0:00 /opt/venv/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=4, pipe_handle=14) --multiprocessing-fork
1000 307 0.0 0.0 4844 880 pts/0 S+ 16:46 0:00 grep python
It’s looks like some processes wasn’t joined properly.Max Wong
12/03/2021, 2:06 AMError: Caught an error for run b11d456b-0374-4936-8ff2-e9a0ae9492f5 while removing it from the queue. Marking the run as failed and dropping it from the queue: Exception: Timed out waiting for gRPC server to start with arguments: "/usr/local/bin/python -m dagster.grpc --lazy-load-user-code --socket /tmp/tmpt1i9jqlo --heartbeat --heartbeat-timeout 120 --fixed-server-id bcb5c01a-2f88-4439-ab7e-136ca4e65b67 -f /opt/dagster/dags/repos.py -d /opt/dagster/dags". Most recent connection error: grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "failed to connect to all addresses"
debug_error_string = "{"created":"@1638496923.552474091","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3093,"referenced_errors":[{"created":"@1638496923.552472571","description":"failed to connect to all addresses","file":"src/core/lib/transport/error_utils.cc","file_line":163,"grpc_status":14}]}"
This only happens to a single pipeline. Altho it’s a heaviest one (in terms of computing resources required). Other pipelines work fine.Qwame
12/03/2021, 3:04 AMGeorge Pearse
12/03/2021, 1:35 PMChris Chan
12/03/2021, 2:27 PMRunRequest
. Since it seems like the config specified in the RunRequest
overrides the config in the configured job, I’d like to combine the configured job’s config with additional config in the RunRequest
. The reason I can’t put the additional config when configuring the job is that the additional config is runtime dependent (date), while the job’s config is fixedJorge Lima
12/03/2021, 2:27 PMSkipped No skip reason provided
, is anyone able to give me a hand here please?Qwame
12/03/2021, 3:20 PM. db
files in DAGSTER_HOME
. Can these be configured to persist to another database? Like having a PostgreSQL backend.Manny Schneck
12/03/2021, 4:55 PMAlex Saltzman
12/03/2021, 5:56 PM@op
def echo(msg: str) -> str:
return msg
@op(
ins={'msg': In(str)},
out=Out(str)
)
def echo_2(msg): # same as above
return msg
in the bigger picture, when everything is connected together, is it more "useful" (or convenient) to use Type annotations or ins
"kwarg dictionary"? Or do many people do both?Qwame
12/03/2021, 6:17 PMManny Schneck
12/04/2021, 12:24 AMQwame
12/04/2021, 1:24 AMops, A, B, C and D
B and C both accept results from A to run so the dependency structure is determined automatically. D however, takes no arguments but I want it to run after C. How do I write this job?Remi Gabillet
12/04/2021, 7:51 AMdagster-dbt
is now compatible with dbtCloud which I am very excited about. Heads-up that this documentation should probably be updated since it says:
currently does not provide ops or resources for invoking dbt commands via dbt Clouddagster_dbt
David Bieber
12/04/2021, 9:39 PM############
@op(required_resource_keys={"warehouse"})
def get_message_status(context):
message_id = context.op_config["message_id"])
message_status = context.resources.warehouse.get_message(message_id)
return message_status
@op
def add(x, y):
return x+y
@op(required_resource_keys={"warehouse"})
def get_hdb_count(context):
sym = context.op_config["sym"])
count = context.resources.warehouse.get_hdb_sym_count(sym)
return count
@op(required_resource_keys={"warehouse"})
def get_mem_count(context):
sym = context.op_config["sym"])
count = context.resources.warehouse.get_mem_sym_count(sym)
return count
############
@job
def add_job():
add(get_mem_count(), get_hdb_count())
@job
def message_status_job():
get_message()
I run these jobs as a scheduled task so need to supply two configuration,
one for add_job
ops:
get_hdb_count:
config:
sym: "asset"
get_mem_count:
config:
sym: "asset"
and one for say_message_status_job
ops:
get_message_status:
config:
message_id: "tDefStatus"
in addition there is a common resource configuration
resources:
warehouse:
config:
port: "8888"
address: "localhost"
my plan was to have separate .yaml files for each job but combine them with a common resource .yaml file in the schedule ...
@schedule(cron_schedule="* * * * *",job=add_job)
def message_status_schedule(context):
resource_config = open_yaml_file("resource.yaml")
ops_config = open_yaml_file("env_message_status.yaml")
config = combine_configs(resource_config, ops_config)
return config
@schedule(cron_schedule="* * * * *",job=add_job)
def add_schedule(context):
resource_config = open_yaml_file("resource.yaml")
ops_config = open_yaml_file("env_add.yaml")
config = combine_configs(resource_config, ops_config)
return config
It felt a bit cumbersome and was trying to work out if there is a more pythonic/dagster concept that I am missing which would be better suited?
Many thanks
DavidManny Schneck
12/04/2021, 9:47 PMManny Schneck
12/04/2021, 9:47 PMQwame
12/04/2021, 10:53 PMWarning: compute log capture is disabled for the current environment. Set the environment variable PYTHONLEGACYWINDOWSSTDIO to enable.
Any thoughts on this warning?Manny Schneck
12/05/2021, 7:44 AM@op
def hi():
return 'hello'
@graph
def foo():
hi()
yeet = hi.to_job()
Am I missing something? This looks isomorphic to the docs:
@graph
def calories():
normalize_calories(download_csv())
calories_test_job = calories.to_job(
resource_defs={"warehouse": local_sqlite_warehouse_resource}
)
calories_dev_job = calories.to_job(
resource_defs={"warehouse": sqlalchemy_postgres_warehouse_resource}
)
But sadness:
scripts/end_to_end.py:727: in <module>
yeet = hi.to_job()
E AttributeError: 'OpDefinition' object has no attribute 'to_job'
Manny Schneck
12/05/2021, 7:45 AM(venv) [5/12/21 1:44:58] ➜ dagster_prototype git:(dagster-storage) ✗ dagit --version
dagit, version 0.13.9
Manny Schneck
12/05/2021, 7:45 AMpytest
medihack
12/05/2021, 3:54 PM<http://foo.to|foo.to>_job()
?Sa'ar Elias
12/05/2021, 6:53 PM@op(out=DynamicOut())
def my_op():
yield DynamicOutput(
value={'a': 1, 'b': 2},
key=...
)
@op
def another_op(a, b):
return a + b
@job
def test():
my_op().map(another_op)
2. Also when mapping tasks results, is there a way to pass in extra arguments?
@op
def another_op(a, b):
return a + b
@job
def test():
mynumber = get_number()
my_op_2().map(another_op, mynumber)
Thanks!Qwame
12/06/2021, 5:40 AMOps:
op_a:
config:
right: amber
If I have several ops
that use the same right
argument, how do I avoid repeating the same thing for each op?George Pearse
12/06/2021, 10:36 AMRahul Sharma
12/06/2021, 11:14 AMThomas
12/06/2021, 2:26 PMfrom pymongo import MongoClient
class Connect(object):
@staticmethod
def get_connection():
return MongoClient("mongodb://$[username]:$[password]@$[hostlist]/$[database]?authSource=$[authSource]")
from connect import Connect
from pymongo import MongoClient
connection = Connect.get_connection()
db = client.test
db.inventory.insert_one(
{"item": "canvas",
"qty": 100,
"tags": ["cotton"],
"size": {"h": 28, "w": 35.5, "uom": "cm"}})
```
and dagster could be:
```def my_ops() -> dict:
return {"item": "canvas",
"qty": 100,
"tags": ["cotton"],
"size": {"h": 28, "w": 35.5, "uom": "cm"}}
I think of ressources to do that but I am not quite sure on how to start ?
At the end, the mongodb will serve another service. What do you think ?David Robinson
12/06/2021, 7:44 PMArun Kumar
12/06/2021, 8:28 PMManny Schneck
12/06/2021, 10:54 PMdagit -f ....
I'm able to reproduce this in the hacker news example repo:
(see thread)