David Hyman
10/25/2021, 5:00 PMload_from:
- grpc_server:
host: executor
port: 4000
Clicking the "reload repository" buttons on the UI all indicate they are doing something, but the pipelines are never updated.
Previously I suspected the 'lazy repository' pattern (I was returning a callable as per docs) but even returning JobDefinition objects it still doesn't work. It requires a full container restart to force a reload:
@repository
def dev_repository():
return list(pipelines_factory(mode=mode_development))
My questions are:
• what am I doing wrong?
• how can I have a containerised deployment that supports reloading (manual, or preferably automatically via a 'sensor'?)
• or perhaps I've misunderstood the functionality - what is the act of 'reloading repository' for, if not to re-fetch repository content?
TIA! 😀
(0.13.0, but I couldn't get it to work prior to that either)Chris Chan
10/25/2021, 5:29 PMNavneet Sajwan
10/25/2021, 6:47 PMMark Fickett
10/25/2021, 7:47 PMdagster-aws up
in the release announcement for 0.6 but when I pip install dagster-aws
there's no executable, did that functionality go away? I also found someone's Terraform module which doesn't look like it's seen much adoption; and I can follow the Deploying Dagster to AWS docs but it leaves setting up an RDS and an executor as exercises for the reader.Navneet Sajwan
10/25/2021, 8:00 PMdagster_shell.create_shell_script_solid
?Swadhin Swain
10/26/2021, 4:41 AMMax Wong
10/26/2021, 5:17 AM0.13.1
, is there a way to “populate” multiple config presets in launchpad
UI?
previously we have two configs: dev
and prod
(I assume it needs Mode
), with the new API changes not sure how I should migrate
edit: I think I found what I’m looking for https://dagster.slack.com/archives/C01U954MEER/p1635091915068000Swadhin Swain
10/26/2021, 10:18 AMMohammad Nazeeruddin
10/26/2021, 11:13 AM- name: "example-user-code-3"
image:
repository: "dagsterimage/user-code-example-v37"
tag: "latest"
pullPolicy: Always
dagsterApiGrpcArgs:
- "--python-file"
- "/home/orchestrator/system/system.py" < EFS mounted path >
port: 3030
But i am not able to access repos. getting this error : FileNotFoundError: [Errno 2] No such file or directory: '/home/orchestrator/system/system.py
Can we access user-code-deployment repos from efs location if i configured this efs path like above sample code.Carter
10/26/2021, 1:39 PMMax Wong
10/26/2021, 2:31 PM0.13.1
found out that using slack_on_success
and slack_on_success
yield two behaviors:
1. it is marked as legacy
in dagit
2. it displays error
python
dagster.core.errors.DagsterInvalidConfigError: Error in config for pipeline
Error 1: Received unexpected config entry "config" at path root:execution. Expected: "{ multi_or_in_process_executor?: { config?: { in_process?: { marker_to_close?: String retries?: { disabled?: { } enabled?: { } } } multiprocess?: { max_concurrent?: Int retries?: { disabled?: { } enabled?: { } } } } } }".
File "/usr/local/lib/python3.9/site-packages/dagster/grpc/impl.py", line 342, in get_external_execution_plan_snapshot
create_execution_plan(
File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/api.py", line 727, in create_execution_plan
resolved_run_config = ResolvedRunConfig.build(pipeline_def, run_config, mode=mode)
File "/usr/local/lib/python3.9/site-packages/dagster/core/system_config/objects.py", line 152, in build
raise DagsterInvalidConfigError(
but, if I remove slack decorators:
python
# @slack_on_success("#dagster", dagit_base_url=os.environ["DAGIT_BASE_URL"])
# @slack_on_failure("#dagster-fail", dagit_base_url=os.environ["DAGIT_BASE_URL"])
@job(resource_defs=resource_defs, config=prod_config)
def cookbook():
one = step_one()
two = step_two()
step_three(one, two)
it works as normal.
in addition, the slack api docs still use legacy API. https://docs.dagster.io/_apidocs/libraries/dagster-slack
I assume the migration docs is in the works? blob smile happy eyespraveen
10/26/2021, 4:25 PMBUCKET_SIZE = 10
@pipeline(
mode_defs=[MODE_DEFINITION],
preset_defs=[PRESET_DEFINITION]
)
def test_data_pipeline():
some_data = get_some_data()
for i in range(BUCKET_SIZE):
process_some_data(i)(some_data)
@solid()
def get_some_data(context):
<http://context.log.info|context.log.info>(f"Starting !")
all_files = get_all_files()
val = np.array_split(all_files, 10)
return val
def process_some_data(value):
@solid(
name=f"process_some_data_{value}",
tags={
"value": f"{value}",
}
)
def _process_some_data(context):
index = context.solid_def.tags['value']
...
...
return _process_some_data
Will Gunadi
10/26/2021, 4:32 PMVax Thurai
10/26/2021, 7:00 PMJim Nisivoccia
10/27/2021, 1:20 AMArun Kumar
10/27/2021, 3:25 AMMax Wong
10/27/2021, 6:52 AMconfig
in @job
and trigger it via schedule
, it doesn’t display configuration
in the schedule run UI
, although the pipeline seems to be running with the correct configLevan
10/27/2021, 1:06 PMNilesh Pandey
10/27/2021, 3:34 PMErik
10/27/2021, 4:55 PMRubén Lopez Lozoya
10/27/2021, 5:09 PMraise DagsterInvalidDefinitionError(
E dagster.core.errors.DagsterInvalidDefinitionError: @composite_solid 'c_solid_create_re_uw_template' has unmapped input 'enable'. Remove it or pass it to the appropriate solid invocation.
Egor -
10/27/2021, 8:27 PMKenneth Barrett
10/27/2021, 9:14 PMfrom dagster import job, op
from dagster_aws.cloudwatch import cloudwatch_logger
@op
def my_op(context):
<http://context.log.info|context.log.info>("Hello")
context.log.debug("World")
@job(logger_defs={'cloudwatch': cloudwatch_logger})
def my_job():
my_op()
But I can't seem to figure out how to hard-code in the logger config for each job in my docker dagster deployment. Where in the above should I be passing in the config dict for my my job to instantiate the cloudwatch logger with my required settings?. I've tried this, but I get errors, I guess because it's expecting a logger definition and not an actual logger instance:
from dagster_aws.cloudwatch import cloudwatch_logger
from dagster import job, op, build_init_logger_context
cloudwatch_config = {
"log_level": "INFO",
"log_group_name": "dagster-test",
"log_stream_name": "my-job",
"aws_region": "eu-west-1",
"aws_access_key_id": "ABCDEFGHIJKLMNOP",
"aws_secret_access_key": "12345678910"
}
my_job_context = build_init_logger_context(logger_config=cloudwatch_config)
@op
def my_op(context):
<http://context.log.info|context.log.info>("Hello")
context.log.debug("World")
@job(logger_defs={'cloudwatch': cloudwatch_logger(my_job_context)})
def my_job():
my_op()
Any idea what silly oversight I'm making?Jim Nisivoccia
10/27/2021, 11:57 PMJim Nisivoccia
10/27/2021, 11:59 PMHebo Yang
10/28/2021, 1:02 AMslack_message_on_failure
annonated with failure_hook
and annotated my pipeline with @slack_message_on_failure
. However, the hook doesn’t really get triggered when the pipeline fails. Is this because I am using a databricks spark step launcher and the failure event are STEP_FAILURE and PIPELINE_FAILURE?
What’ the right way to set up a hook if pipeline fails please?Mohammad Nazeeruddin
10/28/2021, 5:50 AMThomas
10/28/2021, 8:54 AMmrdavidlaing
10/28/2021, 2:56 PMmake dev_install
➤ YN0013: │ @babel/helper-create-class-features-plugin@npm:7.14.0 can't be found in the cache and will be fetched from the remote registry
➤ YN0001: │ Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at new NodeError (node:internal/errors:371:5)
at PassThrough.onclose (node:internal/streams/end-of-stream:122:30)
at PassThrough.emit (node:events:390:28)
at emitCloseNT (node:internal/streams/destroy:145:10)
at processTicksAndRejections (node:internal/process/task_queues:82:21)
Max Wong
10/28/2021, 2:56 PMpartition offset
in monthly schedule
in 0.13.1
I couldn’t find the parameters anywhere (monthly_partitioned_config
). is this by design?
end_offset
seems to be the param in the updated API. The docs could probably mention this as well?