Rodrigo Baron
01/07/2022, 1:00 PMAndy Chen
01/07/2022, 2:26 PMraul
01/07/2022, 3:27 PMBernardo Cortez
01/07/2022, 3:49 PMbuild_schedule_from_partitioned_job
. However, this approach has the downside of being prone to skip partitions, since it depends of the regularity of the availability of the new partition. In other words, if I expect the new partition to arrive daily at 10am and it arrives at 10:30am or 11am, I will lose that partition forever, since, in the next day, it will only process in the next day. I could, indeed, implement something like a more frequent schedule, but that turns out to be close to the idea of a sensor.
Is it possible to implement a sensor that listens to these types of events? What dagster.core.events.DagsterEventType
subclass should I use?Daniel Eduardo Portugal Revilla
01/07/2022, 8:46 PMRaphael Krupinski
01/07/2022, 9:25 PMTim
01/08/2022, 5:01 AMLoc Nguyen
01/08/2022, 8:02 AMWill Gunadi
01/08/2022, 5:36 PMNavneet Sajwan
01/08/2022, 6:40 PMHuib Keemink
01/08/2022, 7:11 PMcontext.log
) stuff other than printable strings? For instance, it would be really useful if I could log something like a plot. I know I could use dagstermill, but at that point I feel like I could just as well use DataBricksHuib Keemink
01/08/2022, 7:12 PMHuib Keemink
01/08/2022, 7:13 PMmadhurt
01/09/2022, 8:53 AMdagster-dbt
library for integration with dbt. According to the docs, I can send extra keyword arguments such as my_flag_name = 'foo'
will get converted to --my-flag-name foo
. But, I need to use an argument called --no-version-check
which doesn’t accept any values in front of it. So how can I pass it to context.resources.dbt.run()
?Sa'ar Elias
01/09/2022, 12:40 PMraise Failure()
do something like yield Stop()
and count it as a success)?Daniel Suissa
01/09/2022, 3:55 PMcontext
argument? I'm trying to wrap op
in a decorator that uses the context to determine whether the op should be skipped (i.e return None for a non required Out)Yan
01/10/2022, 12:17 AMMykola Palamarchuk
01/10/2022, 6:58 PMBryan Chavez
01/11/2022, 12:56 AM@run_status_sensor(
pipeline_run_status=PipelineRunStatus.SUCCESS,
pipeline_selection=pipeline_names,
)
def job_success_email_sensor(context: RunStatusSensorContext):
Keshav
01/11/2022, 5:05 AMSa'ar Elias
01/11/2022, 10:46 AMMohammad Nazeeruddin
01/11/2022, 11:21 AMgrpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.UNKNOWN
details = "Exception iterating responses: Errors whilst loading configuration for {'instance_config_map': Field(<dagster.config.source.StringSourceType object at 0x7f040d00c790>, default=@, is_required=True), 'postgres_password_secret': Field(<dagster.config.source.StringSourceType object at 0x7f040d00c790>, default=@, is_required=False), 'dagster_home': Field(<dagster.config.source.StringSourceType object at 0x7f040d00c790>, default=/opt/dagster/dagster_home, is_required=False), 'job_image': Field(<dagster.config.config_type.Noneable object at 0x7f03dec2f7f0>, default=@, is_required=False), 'image_pull_policy': Field(<dagster.config.config_type.Noneable object at 0x7f03dec2fb20>, default=None, is_required=False), 'image_pull_secrets': Field(<dagster.config.config_type.Noneable object at 0x7f03deccb9d0>, default=@, is_required=False), 'service_account_name': Field(<dagster.config.config_type.Noneable object at 0x7f03dec3edc0>, default=@, is_required=False), 'env_config_maps': Field(<dagster.config.config_type.Noneable object at 0x7f03dec3e070>, default=@, is_required=False), 'env_secrets': Field(<dagster.config.config_type.Noneable object at 0x7f03dec3ecd0>, default=@, is_required=False), 'env_vars': Field(<dagster.config.config_type.Noneable object at 0x7f03dec3e6d0>, default=@, is_required=False), 'volume_mounts': Field(<dagster.config.config_type.Array object at 0x7f03deccb790>, default=[], is_required=False), 'volumes': Field(<dagster.config.config_type.Array object at 0x7f03dec2f220>, default=[], is_required=False), 'job_namespace': Field(<dagster.config.source.StringSourceType object at 0x7f040d00c790>, default=default, is_required=False), 'load_incluster_config': Field(<dagster.config.config_type.Bool object at 0x7f040f161b50>, default=True, is_required=False), 'kubeconfig_file': Field(<dagster.config.config_type.Noneable object at 0x7f03decd50d0>, default=None, is_required=False)}.
Error 1: Post processing at path root:job_image of original value {'env': 'DAGSTER_K8S_PIPELINE_RUN_IMAGE'} failed:
dagster.config.errors.PostProcessingError: You have attempted to fetch the environment variable "DAGSTER_K8S_PIPELINE_RUN_IMAGE" which is not set. In order for this execution to succeed it must be set in this environment.
Stack Trace:
File "/usr/local/lib/python3.8/site-packages/dagster/config/post_process.py", line 77, in _post_process
new_value = context.config_type.post_process(config_value)
File "/usr/local/lib/python3.8/site-packages/dagster/config/source.py", line 42, in post_process
return str(_ensure_env_variable(cfg))
File "/usr/local/lib/python3.8/site-packages/dagster/config/source.py", line 16, in _ensure_env_variable
raise PostProcessingError(
"
debug_error_string = "{"created":"@1641899640.132209114","description":"Error received from peer ipv4:192.168.223.227:3030","file":"src/core/lib/surface/call.cc","file_line":1069,"grpc_message":"Exception iterating responses: Errors whilst loading configuration for {'instance_config_map': Field(<dagster.config.source.StringSourceType object at 0x7f040d00c790>, default=@, is_required=True), 'postgres_password_secret': Field(<dagster.config.source.StringSourceType object at 0x7f040d00c790>, default=@, is_required=False), 'dagster_home': Field(<dagster.config.source.StringSourceType object at 0x7f040d00c790>, default=/opt/dagster/dagster_home, is_required=False), 'job_image': Field(<dagster.config.config_type.Noneable object at 0x7f03dec2f7f0>, default=@, is_required=False), 'image_pull_policy': Field(<dagster.config.config_type.Noneable object at 0x7f03dec2fb20>, default=None, is_required=False), 'image_pull_secrets': Field(<dagster.config.config_type.Noneable object at 0x7f03deccb9d0>, default=@, is_required=False), 'service_account_name': Field(<dagster.config.config_type.Noneable object at 0x7f03dec3edc0>, default=@, is_required=False), 'env_config_maps': Field(<dagster.config.config_type.Noneable object at 0x7f03dec3e070>, default=@, is_required=False), 'env_secrets': Field(<dagster.config.config_type.Noneable object at 0x7f03dec3ecd0>, default=@, is_required=False), 'env_vars': Field(<dagster.config.config_type.Noneable object at 0x7f03dec3e6d0>, default=@, is_required=False), 'volume_mounts': Field(<dagster.config.config_type.Array object at 0x7f03deccb790>, default=[], is_required=False), 'volumes': Field(<dagster.config.config_type.Array object at 0x7f03dec2f220>, default=[], is_required=False), 'job_namespace': Field(<dagster.config.source.StringSourceType object at 0x7f040d00c790>, default=default, is_required=False), 'load_incluster_config': Field(<dagster.config.config_type.Bool object at 0x7f040f161b50>, default=True, is_required=False), 'kubeconfig_file': Field(<dagster.config.config_type.Noneable object at 0x7f03decd50d0>, default=None, is_required=False)}.\n Error 1: Post processing at path root:job_image of original value {'env': 'DAGSTER_K8S_PIPELINE_RUN_IMAGE'} failed:\ndagster.config.errors.PostProcessingError: You have attempted to fetch the environment variable "DAGSTER_K8S_PIPELINE_RUN_IMAGE" which is not set. In order for this execution to succeed it must be set in this environment.\n\nStack Trace:\n File "/usr/local/lib/python3.8/site-packages/dagster/config/post_process.py", line 77, in _post_process\n new_value = context.config_type.post_process(config_value)\n File "/usr/local/lib/python3.8/site-packages/dagster/config/source.py", line 42, in post_process\n return str(_ensure_env_variable(cfg))\n File "/usr/local/lib/python3.8/site-packages/dagster/config/source.py", line 16, in _ensure_env_variable\n raise PostProcessingError(\n","grpc_status":2}"
>
File "/usr/local/lib/python3.7/site-packages/dagster/daemon/sensor.py", line 238, in execute_sensor_iteration
sensor_debug_crash_flags,
File "/usr/local/lib/python3.7/site-packages/dagster/daemon/sensor.py", line 268, in _evaluate_sensor
job_state.job_specific_data.cursor if job_state.job_specific_data else None,
File "/usr/local/lib/python3.7/site-packages/dagster/core/host_representation/repository_location.py", line 721, in get_external_sensor_execution_data
cursor,
File "/usr/local/lib/python3.7/site-packages/dagster/api/snapshot_sensor.py", line 49, in sync_get_external_sensor_execution_data_grpc
cursor=cursor,
File "/usr/local/lib/python3.7/site-packages/dagster/grpc/client.py", line 299, in external_sensor_execution
sensor_execution_args
File "/usr/local/lib/python3.7/site-packages/dagster/grpc/client.py", line 118, in _streaming_query
yield from response_stream
File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 426, in __next__
return self._next()
File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 826, in _next
raise self
Nick Dellosa
01/11/2022, 2:07 PMRootInputManager
as opposed to just defining an upstream op to get a value?nikki
01/11/2022, 5:16 PMarn:aws:secretsmanager:us-west-2:234sdfr9823:MY_SECRET:SECRET_1::
. Can you use the same ARN in dagster.yaml and if so do the secrets get translated to ENV vars correctly? ie env would have SECRET_1 and SECRET_2 in it? The ECSRunLauncer docs don't really say ... nor do they describe very well how the secrets get translated into ENV vars ...Will Gunadi
01/11/2022, 6:15 PMException in thread postgres-event-watch:
Traceback (most recent call last):
File "/usr/lib/python3.8/threading.py", line 932, in _bootstrap_inner
self.run()
File "/usr/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/home/kfdev/dagster/v_dagster/lib/python3.8/site-packages/dagster_postgres/event_log/event_log.py", line 258, in watcher_thread
for notif in await_pg_notifications(
File "/home/kfdev/dagster/v_dagster/lib/python3.8/site-packages/dagster_postgres/pynotify.py", line 123, in await_pg_notifications
conn.poll()
psycopg2.OperationalError: SSL SYSCALL error: Connection timed out
Anyone have seen this?Bryan Chavez
01/11/2022, 6:16 PMKeith Devens
01/11/2022, 8:19 PM@graph
def get_data():
g = create_shell_command_op("...", name="get_data")
g()
@graph(ins={"start": In(Nothing)})
def process_data():
p = create_shell_command_op("...", name="process_data")
p()
@job
def load_data():
data = get_data()
process_data(start=data)
But, when I run dagit -f jobs.py
getting:
UserWarning: Error loading repository location jobs.py:dagster.core.errors.DagsterInvalidDefinitionError: @graph 'process_data' decorated function does not have parameter(s) 'start', which are in provided input_defs. @graph decorated functions should only have keyword arguments that match input names and, if system information is required, a first positional parameter named 'context'.
I’ve been having trouble just figuring out how to get Dagster to call things in order within a @job.Nick Dellosa
01/11/2022, 8:36 PMIn
dict and a default value set on the parameter in the op definition but I'm still getting is not connected to the output of a previous node and can not be loaded from configuration, making it impossible to execute
Fikri Thauli
01/12/2022, 12:04 AMChris Gough
01/12/2022, 7:39 AMcontext.op_config == None
. I want an easy way to manage config with files, then have the right config loaded at the tight time by scheduled jobs. The workflow I imagine is saving config yaml files (as per config editor in the daggit launcher) somewhere and somehow have the scheduled job load them at runtime. Is there an idiomatic way to do that (or a similar but better idiom), or would I need to write my own config-reading operation that edits the context in place ahead of the ops that need it?Chris Gough
01/12/2022, 7:39 AMcontext.op_config == None
. I want an easy way to manage config with files, then have the right config loaded at the tight time by scheduled jobs. The workflow I imagine is saving config yaml files (as per config editor in the daggit launcher) somewhere and somehow have the scheduled job load them at runtime. Is there an idiomatic way to do that (or a similar but better idiom), or would I need to write my own config-reading operation that edits the context in place ahead of the ops that need it?Roel Hogervorst
01/12/2022, 8:03 AMChris Gough
01/12/2022, 8:07 AMdaniel
01/12/2022, 3:59 PMwith open("your_run_config_yaml_file.yaml", "r") as f:
return yaml.load(f)
and then it could change each time the schedule tick happens. Not sure if that would satisfy your requirements here exactly though.
You can also source the value of individual fields from environment variables at runtime as well - there's a "StringSource" type that you can use that lets you determine values at runtime: https://docs.dagster.io/_apidocs/config#dagster.StringSource