Navneet Sajwan
10/12/2021, 12:54 PMThomas
10/12/2021, 2:26 PMSet-Variable -Name DAGSTER_HOME -Value C:\Users\myuser\dagster_home
Chris Chan
10/12/2021, 3:56 PMVladislav Ladenkov
10/12/2021, 4:00 PMKoby Kilimnik
10/12/2021, 4:07 PMDonny Winston
10/12/2021, 6:34 PMgrpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.DEADLINE_EXCEEDED
details = "Deadline Exceeded"
debug_error_string = "{"created":"@1634063600.070690200","description":"Error received from peer unix:/tmp/tmps5h2tkhh","file":"src/core/lib/surface/call.cc","file_line":1069,"grpc_message":"Deadline Exceeded","grpc_status":4}"
>
Daniel Rodman
10/13/2021, 1:37 AMdagit
when I try to run a pipeline (I can see the pipeline I just can’t run it):
dagster.core.errors.DagsterInvariantViolationError: repo not found at module scope in file /opt/dagster/app/repository.py.
File "/usr/local/lib/python3.7/site-packages/dagster/grpc/impl.py", line 75, in core_execute_run
recon_pipeline.get_definition()
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/reconstructable.py", line 110, in get_definition
defn = self.repository.get_definition().get_pipeline(self.pipeline_name)
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/reconstructable.py", line 46, in get_definition
return repository_def_from_pointer(self.pointer)
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/reconstructable.py", line 518, in repository_def_from_pointer
target = def_from_pointer(pointer)
File "/usr/local/lib/python3.7/site-packages/dagster/core/definitions/reconstructable.py", line 460, in def_from_pointer
target = pointer.load_target()
File "/usr/local/lib/python3.7/site-packages/dagster/core/code_pointer.py", line 233, in load_target
name=self.fn_name, file=self.python_file
For reference… I have 4 separate repositories. They are all configured in my workspace.yaml
:
load_from:
# Each entry here corresponds to a service in the docker-compose file that exposes pipelines.
- grpc_server:
host: dagster_pipelines
port: 4000
location_name: "dagster_pipelines"
- grpc_server:
host: dagster_experiment
port: 4001
location_name: "dagster_experiment"
- grpc_server:
host: dagster_dbt
port: 4002
location_name: "dagster_dbt"
- grpc_server:
host: dagster_data_app
port: 4003
location_name: "dagster_data_app"
I’m trying to organize these repositories as dagster projects. Each project has its own associated dockerized image. This is an example of how the files are organized:
├── projects
│ ├── dbt
│ │ ├── dbt
│ │ │ ├── pipelines
│ │ │ ├── schedules
│ │ │ ├── sensors
│ │ │ └── solids
│ │ │ └── repository.py
I’m able to shell into the container and run pipelines. However, in dagit I’m getting the above error. (Perhaps is related to my docker setup?) Thanks again.Donny Winston
10/13/2021, 2:48 AMAlejandro A
10/13/2021, 8:14 AMNavneet Sajwan
10/13/2021, 9:30 AMGayathri Chakravarthy
10/13/2021, 10:32 AMfrom dagster import resource, solid, ModeDefinition, pipeline
from dagster_aws.s3 import s3_resource
class HelperAwsS3:
def __init__(self, s3_resource):
self.s3_resource = s3_resource
def s3_list_bucket(self, bucket, prefix):
return self.s3_resource.list_objects_v2(
Bucket=bucket,
Prefix=prefix
)
def s3_download_file(self, bucket, file, local_path):
self.s3_resource.meta.client.download_file(
Bucket=bucket,
Key=file,
Filename=local_path
)
def s3_upload_file(self, bucket, file, local_path):
self.s3_resource.meta.client.upload_file(
Bucket=bucket,
Key=file,
Filename=local_path
)
The _s3_resource_ is actually _dagster_aws.s3.s3_resource_ which will help me connect to AWS using my local aws credenitals.
I am not sure how to pass the _s3_resource_ to the HelperAwsS3 when I make the call in the @resource section below.
@resource
def connection_helper_aws_s3_resource(context):
return HelperAwsS3()
Any pointers please? Or am I doing it all wrong and it needs doing in a different way?
Thanks for your help.Koby Kilimnik
10/13/2021, 11:48 AMKoby Kilimnik
10/13/2021, 11:49 AM@solid(required_resource_keys={'s3'})
def my_solid(_context):
...
Koby Kilimnik
10/13/2021, 11:51 AMfrom dagster_aws.s3 import s3_resource
MODE_TEST = ModeDefinition(name="test", resource_defs={
"s3": s3_resource
})
@pipeline(mode_defs=[MODE_TEXT])
def my_pipeline():
my_solid()
Koby Kilimnik
10/13/2021, 11:53 AMMODE_TEST = ModeDefinition(name="test", resource_defs={
"s3": s3_resource.configured({"region_name":"us-east-1"})
})
mrdavidlaing
10/13/2021, 1:10 PMUserWarning: Module ... was resolved using the working directory. The ability to load uninstalled modules from the working directory is deprecated and will be removed in a future release. Please use the python-file based load arguments or install ... to your python environment.
errors when using execute_pipeline()
as shown below:
result = execute_pipeline(
pipeline=reconstructable(my_pipeline),
preset="prod",
instance=DagsterInstance.get(),
)
Is there a way to use the python-file based load arguments
when using execute_pipeline() / reconstructable() ?Grigoriy Sterin
10/13/2021, 1:34 PMdagster/examples/deploy_ecs
and I'm trying to run the example pipeline, but I'm getting the following error:
dagster.core.errors.DagsterInvalidInvocationError: Too many input arguments were provided for solid 'solid'. This may be because an argument was provided for the context parameter, but no context parameter was defined for the solid.
Which seems strange, since there are literally zero arguments provided. Here's the code of the pipeline:
import time
import dagster
@dagster.solid
def solid():
time.sleep(30)
return True
@dagster.pipeline
def pipeline():
solid()
@dagster.repository
def repository():
return [pipeline]
Thank you!szalai1
10/13/2021, 2:53 PMdagster.core.errors.DagsterExecutionStepNotFoundError: Can not build subset plan from unknown step: create_full_taker_answers_table
same pipeline runs fine (even partially) if we start in the playground. we cannot see any errors on dagit, celery and user deployment pods. any ideas how to find the problem?Dalin Kim
10/13/2021, 5:45 PMKoby Kilimnik
10/13/2021, 6:12 PMKoby Kilimnik
10/13/2021, 6:13 PMKoby Kilimnik
10/13/2021, 6:14 PMMartim Passos
10/13/2021, 6:25 PMlocal_file_manager
and s3_file_manager
. I understand they’re designed to use the same API so you can just switch modes, but the former uses a local_file_handle
and the latter a s3_file_handle
. So my code still needs some kind of checking to differentiate/determine which one to create or am I missing something here?Jordan W
10/13/2021, 10:26 PMoutput_id
and output
. The first yield is output_id
and second yield output
. The bug comes up when the IO manager tries to upload output
. The boto3 call self.s3.list_objects_v2(Bucket=self.bucket, Prefix=key)
will see the prefix output
and remove output_id
then upload output
. The io manager should check for matching ids instead of trusting and removing everything the s3 client lists in the bucket with said prefix.
@solid(
output_defs=[
OutputDefinition(
name="clustering_id",
is_required=True,
description="versioned clustering identifier",
dagster_type=ClusteringId,
),
OutputDefinition(
name="clustering",
is_required=True,
description="versioned clustering object",
dagster_type=Clustering,
),
OutputDefinition(
name="nmf_clustering",
is_required=False,
description="rich NMF clustering object",
dagster_type=NmfClustering,
),
]
)
def foo():
yield Output(bar, "output_id") # Does not exist in the s3 bucket
yield Output(baz, "output") # Does exist in the s3 bucket
Francois-DE
10/14/2021, 8:43 AMSara
10/14/2021, 12:00 PMKobi
10/14/2021, 12:08 PMGrigoriy Sterin
10/14/2021, 6:10 PMVishal Santoshi
10/14/2021, 11:11 PMrun_launcher:
module: dagster_celery_k8s
class: CeleryK8sRunLauncher
config:
dagster_home:
env: DAGSTER_HOME
instance_config_map:
env: DAGSTER_K8S_INSTANCE_CONFIG_MAP
postgres_password_secret:
env: DAGSTER_K8S_PG_PASSWORD_SECRET
broker: "<pyamqp://test:test@dagster-rabbitmq:5672//>"
backend: "rpc://"
The pipeline deploys, but when executing it I see this error
dagster.core.errors.DagsterInvariantViolationError: celery-k8s execution configuration must be present in the run config to use the CeleryK8sRunLauncher. Note: You may also be seeing this error because you are using the configured API. Using configured with the celery-k8s executor is not supported at this time, and all executor config must be directly in the run config without using configured.
Sara
10/15/2021, 8:16 AMpip3 install dagster
pip3 install dagit
dagster new-project PROJECT_NAME
cd PROJECT_NAME
pip3 install --editable .
dagit
dagster-daemon run
And I get this error (attached picture):
In dagster-daemon documentation they don't give much info....
Thanks!!!