Roel Hogervorst
01/12/2022, 10:20 AMdagster validate --path
? I'm thinking CI/CDRaphael Krupinski
01/12/2022, 11:31 AMdinya
01/12/2022, 12:48 PMDylan Hunt
01/12/2022, 4:43 PMPrratek Ramchandani
01/12/2022, 5:10 PMBernardo Cortez
01/12/2022, 5:31 PMTo map multiple outputs, return a dictionary from the composition function.but I don't quite understand it
Bryan Chavez
01/12/2022, 7:21 PMChris Evans
01/12/2022, 9:22 PM{frequency}_partition_configs
apis are not fully there yet. https://docs.dagster.io/guides/dagster/graph_job_op#partition-schedulesChris Chan
01/12/2022, 10:20 PMSTEP_UP_FOR_RETRY
event - but my op doesn’t have a retry policy defined. Is there a default retry set somewhere? Where can I adjust this?Andrew Leverentz
01/12/2022, 10:21 PMKeith Devens
01/12/2022, 10:56 PM@job
def do_load():
data = get_data()
processed = process_data(data)
if environment == 'dev':
# do dev-specific operation
# (but don't load_data until that's done)
load_data(processed)
and be able to have each op take settings for the appropriate environment, and then define “configured” versions of the job like do_load_dev
, do_load_staging
, do_load_prod
that can have the ops use different settings per environment.
Currently I have a yaml config like the following:
resources:
environment:
config:
prod:
user: "prod_user"
url: "https://.../"
staging:
user: "staging_user"
url: "https://.../"
I can read that resource with:
@op(required_resource_keys={"environment"})
def do_thing_with_resource(context):
env = context.resources.environment
print(f"Got env: {env}")
@job(resource_defs={"environment": make_values_resource()})
def run_load():
do_thing_with_resource()
but what’s the right way to preconfigure that per-environment and be able to access the specific environment settings in each op?
Also, what’s the right way to set environment variables for code or shell commands (create_shell_command_op
) in an @op?
Thanks!Nissan Pow
01/13/2022, 1:14 AMFikri Thauli
01/13/2022, 2:32 AMJorn Baayen
01/13/2022, 7:59 AMHemant Kumar
01/13/2022, 9:26 AMQuy
01/13/2022, 10:53 AMenv_secrets
to job as I spotted https://docs.dagster.io/_modules/dagster_k8s/executor#k8s_job_executor
env_secrets=run_launcher.env_secrets + (exc_cfg.get("env_secrets") or []),
Hence, my code is following
download_host_fe_job_stg = download.to_job(
name=f'{NAME}_stg_fe',
resource_defs={
"io_manager": s3_pickle_io_manager,
"s3": s3_resource,
},
config=partitioned_config,
tags={"dagster-k8s/config": {
"job_config": {
"env_secrets": [
"dagster-gcp-secret-fe"
]
}
}}
)
but I got error
TypeError: __init__() got an unexpected keyword argument 'env_secrets'
File "/usr/local/lib/python3.7/site-packages/dagster_graphql/implementation/utils.py", line 34, in _fn
return fn(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/dagster_graphql/implementation/execution/launch_execution.py", line 16, in launch_pipeline_execution
return _launch_pipeline_execution(graphene_info, execution_params)
File "/usr/local/lib/python3.7/site-packages/dagster_graphql/implementation/execution/launch_execution.py", line 50, in _launch_pipeline_execution
run = do_launch(graphene_info, execution_params, is_reexecuted)
File "/usr/local/lib/python3.7/site-packages/dagster_graphql/implementation/execution/launch_execution.py", line 38, in do_launch
workspace=graphene_info.context,
File "/usr/local/lib/python3.7/site-packages/dagster/core/instance/__init__.py", line 1455, in submit_run
SubmitRunContext(run, workspace=workspace)
File "/usr/local/lib/python3.7/site-packages/dagster/core/run_coordinator/default_run_coordinator.py", line 32, in submit_run
self._instance.launch_run(pipeline_run.run_id, context.workspace)
File "/usr/local/lib/python3.7/site-packages/dagster/core/instance/__init__.py", line 1519, in launch_run
self._run_launcher.launch_run(LaunchRunContext(pipeline_run=run, workspace=workspace))
File "/usr/local/lib/python3.7/site-packages/dagster_k8s/launcher.py", line 312, in launch_run
user_defined_k8s_config=user_defined_k8s_config,
File "/usr/local/lib/python3.7/site-packages/dagster_k8s/job.py", line 649, in construct_dagster_k8s_job
**user_defined_k8s_config.job_config,
Please help me to correct configs for a k8s job.George Pearse
01/13/2022, 1:20 PMbotocore.exceptions.EndpointConnectionError: Could not connect to the endpoint URL: "<https://behold-pipelines.s3.eu-west-2.amazonaws.com/?list-type=2&prefix=dagster%2Fstorage%2Fb42b4c9c-b930-4f6d-8a1e-58ea43d01361%2Ffetch_instances_info%5B12%5D%2Fresult&encoding-type=url>"
File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/plan/execute_plan.py", line 193, in _dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/plan/execute_step.py", line 326, in core_dagster_event_sequence_for_step
for evt in _type_check_and_store_output(step_context, user_event, input_lineage):
File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/plan/execute_step.py", line 380, in _type_check_and_store_output
for evt in _store_output(step_context, step_output_handle, output, input_lineage):
File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/plan/execute_step.py", line 488, in _store_output
handle_output_res = output_manager.handle_output(output_context, output.value)
File "/usr/local/lib/python3.7/site-packages/dagster_aws/s3/io_manager.py", line 68, in handle_output
if self._has_object(key):
File "/usr/local/lib/python3.7/site-packages/dagster_aws/s3/io_manager.py", line 50, in _has_object
key_count = self.s3.list_objects_v2(Bucket=self.bucket, Prefix=key)["KeyCount"]
File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 386, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 692, in _make_api_call
operation_model, request_dict, request_context)
File "/usr/local/lib/python3.7/site-packages/botocore/client.py", line 711, in _make_request
return self._endpoint.make_request(operation_model, request_dict)
File "/usr/local/lib/python3.7/site-packages/botocore/endpoint.py", line 102, in make_request
return self._send_request(request_dict, operation_model)
File "/usr/local/lib/python3.7/site-packages/botocore/endpoint.py", line 156, in _send_request
raise exception
File "/usr/local/lib/python3.7/site-packages/botocore/endpoint.py", line 200, in _do_get_response
http_response = self._send(request)
File "/usr/local/lib/python3.7/site-packages/botocore/endpoint.py", line 269, in _send
return self.http_session.send(request)
File "/usr/local/lib/python3.7/site-packages/botocore/httpsession.py", line 352, in send
raise EndpointConnectionError(endpoint_url=request.url, error=e)
I'm assuming the connection to S3 was lost temporarily, I was going to add RetryRequested within the op but because this S3 load is handled by the io_manager I don't think that would increase the robustness at all. How could I make the io_manager try again for this op?Nilesh Pandey
01/13/2022, 2:35 PMmarcos
01/13/2022, 2:49 PM{
"table_name": "some_new_file.json",
"value": df
}
Mykola Palamarchuk
01/13/2022, 3:09 PMdeployments:
- ...
env:
AWS_SHARED_CREDENTIALS_FILE: /etc/aws/credentials
volumes:
- name: aws-credentials
secret:
secretName: aws-credentials
volumeMounts:
- name: aws-credentials
mountPath: /etc/aws
readOnly: true
I've deployed the chart and tested the configuration from inside the pod (with python shell and boto3): it works.
But when I run the job that have to save something to a bucket with Dagit I face the issue
Initialization of resources [io_manager, s3] failed.
botocore.exceptions.ProfileNotFound: The config profile (myprofile) could not be found
As I understand I have to configure something else. Could you help me please?Mykola Palamarchuk
01/13/2022, 4:07 PM@repository
when I build new user-deployments image and upgrade the helm chart. But the pod is updated, so Dagit can't run it, but it doesn't see the changes. I have to uninstall and install the chart again to make it work. I'm using "latest" tag on my image. Could that be a reason?Quy
01/13/2022, 6:35 PM@daily_partitioned_config(start_date=datetime(2021, 11, 1))
def partitioned_config(start: datetime, end: datetime):
return {
'ops': {
'my_func': {
'config': {
'date': start.strftime("%Y%m%d")
}
}
}
}
aims to merge with io_manager’s configs
from dagster.utils import merge_dicts
download.to_job(
resource_defs={
"io_manager": s3_pickle_io_manager,
"s3": s3_resource,
},
config=merge_dicts(partitioned_config, {
'resources': {
'io_manager': {
'config': {
's3_bucket': "dagster-test",
's3_prefix': "dagster"
}
}
}
})
)
Alex Rudolph
01/13/2022, 10:35 PMget_metric_timelines
solid we're now getting the attached error message. We're using the k8s_job_executor and previously this would create a new pod for each dynamic output. I'm having trouble figuring out what causes thisRyan Riopelle
01/14/2022, 12:26 AMSolaris Wang
01/14/2022, 12:53 AMrunLauncher.config.k8sRunLauncher.envSecrets
in the helm chart referencing them, but the pods aren’t spun up with the secrets (in my .py @op code i use os.getenv('the_actual_secret_name')
to troubleshoot and confirm that they are missing) 🙏🏻 thank you boxThomas
01/14/2022, 9:00 AMdinya
01/14/2022, 10:16 AMtag1 > 1 and (tag2 <= "2022-01-14" or tag 3 == "test value")
?Nitin Madhavan
01/14/2022, 11:18 AMmzmm
01/14/2022, 11:57 AMexecute_in_process
, which I don't think does everything dagster would be able to do if used right. Right now, I'm under the impression dagster seems to be built around the declarative assumption and I don't know if there is an API that allows me to use dagster's features like repos, mapping execution to containers etc. given my use case above - any pointers are greatly appreciated, thanksDaniel Michaelis
01/14/2022, 12:31 PMALS
model from pyspark.ml.recommendation
to S3 and reading it back in if this takes place within a dynamically executed graph (i.e. via dynamic mapping). I wrote a custom IO manager using the pattern f's3a://{self.s3_bucket}/{key}'
as _uri_for_key
similar to the one currently implemented in the PickledObjectS3IOManager
. As the step identifiers for the dynamically generated steps contain square brackets [ and ] these are included in the S3 uri when an object is written. Even though I can clearly see the model was saved to this path in S3, I'm getting an error when the downstream op tries to load the model, something like:
py4j.protocol.Py4JJavaError: An error occurred while calling o44.load.
: org.apache.hadoop.mapred.InvalidInputException: Input Pattern s3a://....fit_model[...]/model/metadata matches 0 files
When I replace/remove the square brackets from _uri_for_key
this works fine:
f's3a://{self.s3_bucket}/{key}'.replace('[', '_').replace(']', '')
It seems that technically _uri_for_key
is only used for debug logs in the PickledObjectS3IOManager
and the writing/reading occurs via upload_fileobj
and pickle.loads
without actually using this key. I can imagine that the same error could occur with the PickledObjectS3IOManager
and thought I'd point it out here, in case this hasn't been tested yet. Moreover, square brackets are among the characters to avoid in S3 object keys according to https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-keys.html
On the other hand, maybe the error is specific to my particular use case if:
1. the resulting uri from upload_fileobj
is actually different from the one generated when using upload_fileobj
, which would mean that the debug log message should be corrected
2. maybe the error only occurs when trying to read a folder which is the case for the ALS
model (see error message) but not for pickled objects
3. maybe the error only occurs within Py4J