Peter B
06/21/2021, 12:00 PMBen Torvaney
06/21/2021, 12:19 PMFabian Rabe
06/21/2021, 1:05 PM$ dagster pipeline execute -f root_io_memoized.py
[... long stacktrace ...]
dagster.check.ParameterCheckError: Param "context.version" is not a str. Got None which is type <class 'NoneType'>.
Just from a conceptual point, I probably need to provide the version of the csv file somewhere? I'm still a bit unsure, "where" the IOManager throws the error: During the output of table1_loader
or during the input of my_solid
?
The full code:
# root_io_memoized.py
import pandas as pd
from dagster import (
InputDefinition,
ModeDefinition,
pipeline,
root_input_manager,
solid,
)
from dagster.core.storage.memoizable_io_manager import (
versioned_filesystem_io_manager,
)
from dagster.core.storage.tags import MEMOIZED_RUN_TAG
import warnings
import dagster
warnings.filterwarnings("ignore", category=dagster.ExperimentalWarning)
@solid(
input_defs=[
InputDefinition("dataframe", root_manager_key="my_root_manager")
],
version="solid_version",
)
def my_solid(dataframe):
"""Do some stuff"""
print(dataframe.head())
@root_input_manager(version="root_io_version")
def table1_loader(_):
return pd.read_csv("data.csv")
@pipeline(
mode_defs=[
ModeDefinition(
resource_defs={
"my_root_manager": table1_loader,
"io_manager": versioned_filesystem_io_manager.configured(
{"base_dir": str("data/dagster_fs_io_managed")}
),
}
)
],
tags={MEMOIZED_RUN_TAG: "true"},
)
def my_pipeline():
my_solid()
Luis Rodríguez Escario
06/21/2021, 1:35 PMAlejandro DE LA CRUZ LOPEZ
06/21/2021, 2:57 PMNoah Sanor
06/21/2021, 7:42 PMdbt_rpc
resource that can work locally or on higher environements. However, dbt currently does not support running the rpc server on Windows (which unfortunately my team uses). The other approach is to run the cli commands locally and rpc commands on dev/prod/etc. Is there a defined pattern for doing this now? Please let me know if this question should be asked elsewhere.Sarah Gruskin
06/21/2021, 9:22 PM@daily_schedule(
pipeline_name="my_pipeline",
start_date=datetime(2021, 4, 1),
execution_time=time(hour=8, minute=15),
execution_timezone="UTC",
)
def my_schedule(date):
return {
"solids": {
"my_solid": {
"config": {
"date": date.strftime("%Y-%m-%d")
}
}
}
}
My test is:
def test_pipeline_pipeline():
date = datetime(2021, 5, 1)
res = execute_pipeline(
my_pipeline,
mode="unit_test",
run_config={
"solids": {
"my_solid": {
"config": {
"date": date.strftime("%Y-%m-%d")
}
}
}
}
)
assert isinstance(res, PipelineExecutionResult)
assert res.success
assert res.output_for_solid("validate_data")
My latest error is:
dagster.check.ParameterCheckError: Param "msg" is not a str. Got AttributeError("'SolidExecutionContext' object has no attribute 'date'") which is type <class 'AttributeError'>
At this point I'm very confused and not quite sure how the config works between the scheduling and the testing. Any help is greatly appreciated.Vlad Dumitrascu
06/21/2021, 11:27 PMexecution:
multiprocess:
config:
max_concurrent: 32
storage:
filesystem:
config:
base_dir: "./multiprocess_data"
solids:
...this finally enabled my processes to run in parallel, and take advantage of multi-threading (Yeay!). I can see solids that queued up and have their inputs filled starting now and running in parallel, where they used to wait and still only do one-at-a-time. So, I made a pipeline that looks like this(see attached image):
Well, I have been having a tough time since I switched this pipeline to multithreading. It runs fine for about 18 hours...and memory and system resources remain fine. Then, some point in time (usually in the middle of the night, when no one is watching) my computer just crashes and hard resets. I have been trying to debug this like crazy over the last week. It sounds exactly like a memory leak. But, for a good 90% of the pipeline, the memory resources are fine (20% utilization) and the CPU utilization never gets above 80%(I have 50 cores)!
I started to watch the Windows resource manager like a hawk, and noticed that Dagster was launching A LOT of python threads! More than I expected. I noticed that:
• Each solid becomes a new thread (expected, somewhat).
• Each solid's new thread DOES NOT EVER GET RELEASED until the pipeline finishes. Actually a few minutes later, even.
So, this means that my nice, node-based, re-usable solids are actually hogging like 20 cores CONSTANTLY, even if they finished their task!?
Is there access to these threads?
Can I kill the process when it passes its output to another node?
Has anyone noticed crashes like this?
Is there any way to debug this? To confirm 100% why it crashed?
Any advice would be much appreciated!!
Full disclosure, I have simple solids that do things like split a large list into two lists...which I chain up to divide the work into buckets for each of the main threaded tasks(my QA tests)...and then I have other simple solids that combine two dictionaries into one, which I chain up to get all the results back from those worker threads. So, that's why you see a mess of nodes. It's like this:
start
/ | \
split split split
/ | \ / | \ / | \
work1, work2, work3, work4....work24
\ / \ / \ / \ /
combine combine combine
\ | /
combine
|
resultScott Peters
06/22/2021, 1:46 AMScott Peters
06/22/2021, 1:51 AMdagit
loads repos from docker containers
, does it cache anything local to the `daemon`` or to dagit
. The reason I ask is, currently, for local development, I am volume mounting my repo.py
from the host, so that I can see the changes in realtime.
However, when I change the `@repository`` definition name, say from:
@repository()
def beekeeper_repo():
return [
create
]
to:
@repository
def beekeeper_repo2():
return [
pipe
]
... it tuns locally just fine, but when loaded into dagster
( via docker deployment ), it complains:
dagster.core.errors.DagsterInvariantViolationError: beekeeper_repo2 not found at module scope in file repo.py.
File "/usr/local/lib/python3.8/dist-packages/dagster/grpc/impl.py", line 75, in core_execute_run
recon_pipeline.get_definition()
File "/usr/local/lib/python3.8/dist-packages/dagster/core/definitions/reconstructable.py", line 109, in get_definition
self.repository.get_definition()
File "/usr/local/lib/python3.8/dist-packages/dagster/core/definitions/reconstructable.py", line 46, in get_definition
return repository_def_from_pointer(self.pointer)
File "/usr/local/lib/python3.8/dist-packages/dagster/core/definitions/reconstructable.py", line 506, in repository_def_from_pointer
target = def_from_pointer(pointer)
File "/usr/local/lib/python3.8/dist-packages/dagster/core/definitions/reconstructable.py", line 452, in def_from_pointer
target = pointer.load_target()
File "/usr/local/lib/python3.8/dist-packages/dagster/core/code_pointer.py", line 231, in load_target
raise DagsterInvariantViolationError(
Scott Peters
06/22/2021, 1:54 AMcat
from the host, to the repo.py
file inside the beekeeper
container, it reflects the latest changes:
@repository
def beekeeper_repo2():
return [
pipe
]
However, dagit
seems to be still looking for the old repository, pipeline, solid data
... any ideas on why dagit
is holding onto stale repo
data?Oliver
06/22/2021, 9:25 AMRubén Lopez Lozoya
06/22/2021, 11:19 AMdef my_solid(input0, [input1, input2])
The problem is that input1 and input2 come from other solid executions whose execution may be skipped due to dependencies on prior solids. Is there any way to still allow my_solid to execute despite input1 and input2 computation being skipped?David
06/22/2021, 11:19 AMfiles.map(process_file)
causes multiple instances of the process_file
solid to be run in parallel for each entry that was returned by the earlier call to files_in_directory
.
Is it possible to do this in a way that process_file
is run multiple times sequentially, rather than in parallel? I.e. imagine we want to process each file in order, and only process the next file when the current one was successfully done.Scott Peters
06/22/2021, 2:33 PMvirtualenv
vs python repo.py
in terms of how the code is loaded into dagster
when the daemon
is initialized?
I'm asking to get a clearer picture of the development loop. Currently, I envisioned something like this:
• Create service docker
image including repo.py
• start service dagster, daemon etc
+ service docker
as repo
• keep service docker
repo.py
live updated during development via docker drive mount
• reload service docker
repo from dagit
However, I'm noticing that the updates being made to service docker
repo.py
are not being picked up, even after a dagster
reload on that repo.
Does this ability to dynamically reload code differ if the service
repo.py
is fed in as a single python file? or as a venv
?Scott Peters
06/22/2021, 2:34 PMdocker
is a better development environment for us as it takes care of all the necessary dependencies for any given repo
Jonas Mock
06/22/2021, 3:25 PMNoah Sanor
06/22/2021, 4:34 PMAgon Shabi
06/22/2021, 4:52 PMOutputManager
for some "unconnected" outputs - that is, outputs generated at the end of my pipelines that should go to external systems, and definitely won't be used as inputs to downstream solids.
I'd like to avoid writing an IOManager
whose load_input
implementation just raises a NotImplementedError
, this docstring suggests this used to be a thing (?) :
https://github.com/dagster-io/dagster/blob/8b8c658674b63ff88c0ffa641bdedcded3772d0[…]b/python_modules/dagster/dagster/core/storage/output_manager.py
What's the best way to proceed? "result-pusher" solids which delegate to "result-pusher" resources?Maitiú Ó Ciaráin
06/23/2021, 8:52 AMRetryRequested
but I think papermill is catching the exception and wrapping it, leading to a general failure of the pipeline.
Based on the examples and docs I think just raising a RetryRequested
is the correct approach but I have probably missed something.
Stacktrace:
dagster.core.errors.DagsterExecutionStepExecutionError: Error occurred while executing solid "qa__master_old":
File "/opt/conda/envs/dagster-pipeline/lib/python3.8/site-packages/dagster/core/execution/plan/execute_plan.py", line 193, in _dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/opt/conda/envs/dagster-pipeline/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 308, in core_dagster_event_sequence_for_step
for user_event in check.generator(
File "/opt/conda/envs/dagster-pipeline/lib/python3.8/site-packages/dagster/core/execution/plan/execute_step.py", line 63, in _step_output_error_checked_user_event_sequence
for user_event in user_event_sequence:
File "/opt/conda/envs/dagster-pipeline/lib/python3.8/site-packages/dagster/core/execution/plan/compute.py", line 138, in execute_core_compute
for step_output in _yield_compute_results(step_context, inputs, compute_fn):
File "/opt/conda/envs/dagster-pipeline/lib/python3.8/site-packages/dagster/core/execution/plan/compute.py", line 111, in _yield_compute_results
for event in iterate_with_context(
File "/opt/conda/envs/dagster-pipeline/lib/python3.8/site-packages/dagster/utils/__init__.py", line 384, in iterate_with_context
return
File "/opt/conda/envs/dagster-pipeline/lib/python3.8/contextlib.py", line 131, in __exit__
self.gen.throw(type, value, traceback)
File "/opt/conda/envs/dagster-pipeline/lib/python3.8/site-packages/dagster/core/execution/plan/utils.py", line 62, in solid_execution_error_boundary
raise error_cls(
The above exception was caused by the following exception:
papermill.exceptions.PapermillExecutionError:
---------------------------------------------------------------------------
Exception encountered at "In [13]":
---------------------------------------------------------------------------
RetryRequested Traceback (most recent call last)
<ipython-input-13-cd6a6478a462> in <module>
20 pass
21 else:
---> 22 raise RetryRequested(max_retries=60, seconds_to_wait=10*60)
23
24 if last_metric_date_without_LI != expected_last_metric_date:
RetryRequested:
File "/opt/conda/envs/dagster-pipeline/lib/python3.8/site-packages/dagster/core/execution/plan/utils.py", line 42, in solid_execution_error_boundary
yield
File "/opt/conda/envs/dagster-pipeline/lib/python3.8/site-packages/dagster/utils/__init__.py", line 382, in iterate_with_context
next_output = next(iterator)
File "/opt/conda/envs/dagster-pipeline/lib/python3.8/site-packages/dagstermill/solids.py", line 192, in _t_fn
papermill.execute_notebook(
File "/opt/conda/envs/dagster-pipeline/lib/python3.8/site-packages/papermill/execute.py", line 122, in execute_notebook
raise_for_execution_errors(nb, output_path)
File "/opt/conda/envs/dagster-pipeline/lib/python3.8/site-packages/papermill/execute.py", line 234, in raise_for_execution_errors
raise error
Selman
06/23/2021, 9:29 AMdagster-shell.utils.execute()
. We noticed terminating a run would not kill subprocess created in execute()
. Has any one ever experienced this? What are your workarounds on that?Scott Peters
06/23/2021, 4:03 PMcreation
and validation
of these assets via dagster
, however, I am wondering about discoverability. Can someone point me towards any documentation that covers how I can ask the DagsterInstance
about the assets dagster
has registered? via GraphQL or other API?Somasundaram Sekar
06/23/2021, 7:46 PMModuleNotFoundError: No module named 'boto3'
Below is the the mode definition
emr_mode = ModeDefinition(
name="emr",
resource_defs={
"pyspark_step_launcher": emr_pyspark_step_launcher.configured(
{
"cluster_id": {"env": "EMR_CLUSTER_ID"},
"local_pipeline_package_path": os.path.dirname(os.path.realpath(__file__)),
"deploy_local_pipeline_package": True,
"region_name": "eu-central-1",
"staging_bucket": "dagster-scratch-xxxxxxx",
}
),
"pyspark": pyspark_resource,
"s3": s3_resource,
},
intermediate_storage_defs=[
s3_intermediate_storage.configured(
{"s3_bucket": "dagster-scratch-xxxxxxx", "s3_prefix": "simple-pyspark"}
)
],
)
and the full stack trace from the stdout of the emr
Traceback (most recent call last):
File "/mnt/tmp/spark-47f207e0-9fdc-4577-9701-e276c50f9c5a/emr_step_main.py", line 9, in <module>
import boto3
ModuleNotFoundError: No module named 'boto3'
jurelou
06/23/2021, 8:03 PMJosh Lloyd
06/23/2021, 9:38 PMAdrij Shikhar
06/24/2021, 5:47 AM@solid(required_resource_keys={"gcp_bucket"})
def upload_to_gcp(context, df):
"""Uploads a file to the bucket."""
bucket_name = context.resources.gcp_bucket["bucket_name"]
destination_blob_name = context.resources.gcp_bucket["destination_blob_name"]
storage_client = storage.Client.from_service_account_json(
'./secret/macomptroller-bq-1-bae66fb4d558.json')
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
json_df = ''.join(df.toJSON().collect())
blob.upload_from_blob(json_df, content_type='application/json')
print("Upload completed====>")
I tried with the above mentioned. This works, but the conversion into JSON takes time. Can anyone help me if there is any way in Dagster to upload data to GCP without using Google SDKSelman
06/24/2021, 8:53 AMQueuedRunCoordinator
with tag_concurrency_limits
to limit based on tags, but that requires us to update dagster.yaml
for each new pipeline definition to limit based on their names.Deveshi
06/24/2021, 12:29 PMdagster delete run
This removes the run from dagit UI
However, I can still see the associated log files in
$DAGSTER_HOME/history/runs
and
$DAGSTER_HOME/storage
Thanks!Noah Sanor
06/24/2021, 3:07 PMAndrew Parsons
06/24/2021, 5:50 PMboto3
from within Dockerized Dagster. I am sure the answer is simple but I can't seem to figure out what is wrong.
1. [Screenshot 4] I have created a multi-container Dagster deployment configuration based on the documentation.
2. [Screenshot 4] I don't think it is relevant, but I'm also using the Django ORM side-by-side with Dagster to manage parts of this ETL/ML pipeline.
3. [Screenshot 4] Using a Docker volume, I pass AWS credentials from the host to the container.
4. [Screenshot 4] As I understand, only the pipeline container (not daemon or dagit) needs the AWS credentials volume. In my efforts to debug, I've given it to all three, however.
5. [Screenshot 2] If I docker exec -it <container_id> /bin/bash
into the Dagster containers, I can see that each one has the correct AWS credential file and it is fully populated.
6. [Screenshot 3] If I install the awscli
Python package in a container, I can query the configuration and see that all of the credentials are accessible.
7. [Screenshot 1] I have a Dagster resource through which I try instantiate an S3 bucket.
8. [Screenshot 5] And then to my surprise, the pipeline (or at least the resource) appears to be executing in a different file system!? It cannot find /root/.aws/credentials
9. [No screenshot] ...so the pipeline fails with botocore.exceptions.NoCredentialsError: Unable to locate credentials
10. [No screenshot] I am sure that my Dagster resource works, since I've had successful runs prior to Dockerization.
11. [Screenshot 6] I believe my confusion stems from how the daemon works. Is it spinning up new containers exclusively for a single pipeline run? If so, can I also give this container my AWS credentials volume?
For what it is worth, everywhere I've used /root
, I've also tried with ~/
to the same effect.
Otherwise, congratulations to the Dagster/Elementl team. Dagster has been fun to learn so far, and while the documentation is still needing improvements, a lot of the basic examples are sufficient to help new users get started quickly.
Sorry that the screenshots are out of order; Slack doesn't provide any facilities to reorder them to my knowledge.