Or Asher
11/04/2021, 6:44 AMdagster
version to make them workJames
11/04/2021, 10:13 AMexecute_pipeline()
means I can stay within the VSCode context and use the debugger.
execute_pipeline(reconstructable(my_pipeline),
instance=DagsterInstance.get(),
run_config=some_config,
mode='some_mode')
David Hyman
11/04/2021, 10:37 AMJob
.
Perhaps Partition APIs need to emit RunRequest style objects too, or something...Martin Carlsson
11/04/2021, 1:34 PMop
within a job
.
I’m trying to follow this one
https://docs.dagster.io/concepts/io-management/unconnected-inputs#loading-a-built-in-dagster-type-from-config
But when I run dagit it still require me to add configs.
Actually, what I’m looking for is a way to have an op
that takes a parameter (a string), and then pass that parameter to the op
from a job
.Kenneth Barrett
11/04/2021, 2:14 PMmax_concurrent_runs
to 25, tagging all the ops with "my_key" and setting the tag concurrency limit:
tag_concurrency_limits:
- key: "my_key"
limit: 10
When the job runs, it only ever allows two of the ops to start running concurrently, only starting the third one when one of the first two has finished. Is there some other setting I'm overlooking?Christos Sarakasidis
11/04/2021, 3:10 PMKevin Haynes
11/04/2021, 7:04 PMCezary Pukownik
11/04/2021, 8:17 PMAndy Chen
11/05/2021, 4:38 AMreally_slow_op ----> normal_op
So lets say I build this job. Great!
Then I need to add a new op that depends on normal_op
really_slow_op -----> normal_op ----> new_op
I want to just run new_op so I don't have to waste a lot of time running really_slow_op. I've tried doing this in the launchpad, and I think I have to do something with the input config. Are there any docs referencing the recommended way to do stuff like this? I could only find the one about reexecuting from an error.
Error in dagit, I think i'm misunderstanding what <selector> should be:Andy Chen
11/05/2021, 4:38 AMJazzy
11/05/2021, 8:45 AMRubén Lopez Lozoya
11/05/2021, 9:17 AM2021-11-04T18:48:51.332707043Z/usr/local/lib/python3.7/site-packages/dagster/core/storage/runs/sql_run_storage.py:179: SAWarning: implicitly coercing SELECT object to scalar subquery; please use the .scalar_subquery() method to produce a scalar subquery.
Alessandro Marrella
11/05/2021, 12:07 PM<http://GraphDefinition.to|GraphDefinition.to>_job
doesn't have the retry policy parameter 🤔Andy Chen
11/05/2021, 1:57 PMops:
available_capacity:
inputs:
flowgates_path:
value: "/Users/andychen/dev/nira-data-pipeline/pipeline/pipeline/pjm/ops/raw/flowgates/tmp/flowgates.csv"
I think I've done the typing correctly as shown in the image, but I'm getting an error:
dagster.core.errors.DagsterInvalidConfigError: Error in config for job
Error 1: Received unexpected config entry "inputs" at path root:ops:available_capacity. Expected: "{ config?: Any outputs?: [{ result?: { json: { path: String } pickle: { path: String } } }] }".
File "/Users/andychen/dev/nira-data-pipeline/.venv/lib/python3.9/site-packages/dagster/grpc/impl.py", line 351, in get_external_execution_plan_snapshot
create_execution_plan(
File "/Users/andychen/dev/nira-data-pipeline/.venv/lib/python3.9/site-packages/dagster/core/execution/api.py", line 742, in create_execution_plan
resolved_run_config = ResolvedRunConfig.build(pipeline_def, run_config, mode=mode)
File "/Users/andychen/dev/nira-data-pipeline/.venv/lib/python3.9/site-packages/dagster/core/system_config/objects.py", line 152, in build
raise DagsterInvalidConfigError(
Andy Chen
11/05/2021, 2:08 PMJosh Lloyd
11/05/2021, 5:20 PMEcsRunLauncher
and the QueuedRunCoordinator
with max_concurrent_runs
set to 100
I have 10 runs in progress and 45 in the queue, but I don’t understand why. This ran perfectly fine in stage with all 58 runs happening concurrently. Any ideas on what’s going on here?James
11/05/2021, 6:36 PMins
and out
:
Argument of type "dict[str, ~_In]" cannot be assigned to parameter "ins" of type "Dict[str, In] | None" in function "op"
Type "dict[str, ~_In]" cannot be assigned to type "Dict[str, In] | None"
Is there anyway to handle this elegantly, or best to just ignore?Auster Cid
11/05/2021, 6:49 PMRun Config Functions
and Date Partition Sets
for each of these pipelines.
Launching partition runs from the playground works as expected, but when running a backfill, all launched runs get the config of the last partition selected for the backfill. Any idea why?Martim Passos
11/05/2021, 7:16 PMFR Tendero
11/05/2021, 7:55 PM@run_failure_sensor
that reacts relaunching the job it monitors if it fails.
Job looks just simply look like this:
@job
def my_job():
my_op()
Then I define the failure sensor and the repo:
@run_failure_sensor(job_selection=[my_job])
def on_failure_sensor(context: RunFailureSensorContext):
yield RunRequest(run_key=None, run_config={})
@repository
def my_repo():
return [my_job, on_failure_sensor]
Problem is that in dagit, sensor says "*Sensor does not target a pipeline*". That being said, if the job being monitored (my_job
) fails, the sensor will process the fail (log message like Sensor "on_failure" processed failure of run 032083e3-6bea-4c21-a7d6-85f79d5aa050.
), but won't rerun the job or execute any other code inside the run_failure_sensor
function.
Thanks in advance for your help!Marc Keeling
11/05/2021, 11:00 PMFlorian Giroud
11/08/2021, 9:44 AMfrom dagster_snowflake import snowflake_resource
@op(required_resource_keys={'snowflake_resource'})
def get_one(context):
context.resources.snowflake_resource.execute_query('SELECT 1')
@graph
def say_hello():
get_one()
say_hello_job = say_hello.to_job( resource_defs={'snowflake_resource': snowflake_resource})
jobs = [say_hello_job]
The thing is that we can't run the job because it's missing some conf (see screenshot)
But we did runned dagit with a SNOWFLAKE_USER env var, see below our script to start dagit
export DAGSTER_HOME=/Users/flo/projects/xxx/xxx/dagster/dagster_home
export snowflake_user="TEST"
dagit -h 0.0.0.0 -p 3000
Are we missing something here ? We were expecting that the resource would default the conf from env vars.
But more generally speaking, here are our questions, and we struggle to find the appropriate answer in the docs.
• What is the way in dagster to configure jobs from vars/files ? We will have 50+ jobs and it seems quite cumbersome to have to pass a config to each, looks like a lot of code duplicates
• Also, what is the "official" way to have several resources where just one setting change (the compute_WH in our case), is it using config_from_files ?
• We would like to configure resources at the repo level, not at the job level, is it possible ?
Thanks if you can helps us to get that sorted
Our install is for now just a local PoC, using python and venvOr Asher
11/08/2021, 12:52 PMDavid Farnan-Williams
11/08/2021, 2:59 PMException: Your agent's configuration cannot load locations that specify a Docker image. Either update your location to not include an image, or change the user_code_launcher field in your agent's dagster.yaml file to a launcher that can load Docker images.
Jim Nisivoccia
11/08/2021, 3:22 PMJoe Schmid
11/08/2021, 8:24 PMfrom dagster import job, op, Output
Leo Kell
11/08/2021, 9:38 PMMarc Keeling
11/08/2021, 10:26 PMBryan Chavez
11/09/2021, 2:10 PMWill Gunadi
11/09/2021, 3:04 PM