dhume
04/21/2021, 3:13 PM@solid
def define_dir(input: String) -> String:
return input
@configured(dbt_cli_run)
def configured_dbt_cli_run(dir_name):
return {"project-dir": dir_name}
@pipeline
def dbt_pipeline():
dir = define_dir()
configured_dbt_cli_run(dir)
I found https://github.com/dagster-io/dagster/discussions/3213 which isn’t quite what I needSteve Pletcher
04/21/2021, 3:15 PMEduardo Santizo
04/21/2021, 4:24 PMEduardo Santizo
04/21/2021, 7:59 PMBrian Abelson
04/21/2021, 9:15 PMKen Geis
04/21/2021, 11:13 PMMarco
04/22/2021, 8:29 AMDrew Sonne
04/22/2021, 8:41 AMfrom datetime import datetime
from dagster import (
pipeline,
solid,
SolidExecutionContext,
OutputDefinition,
InputDefinition,
String, PartitionSetDefinition, date_partition_range
)
@solid(
output_defs=[OutputDefinition(name="daily_summary")],
config_schema={'date': str}
)
def fetch_appointments_for_day(context: SolidExecutionContext):
...
@solid(
input_defs=[InputDefinition("daily_summary")],
config_schema={'date': str}
)
def ingest_appointments_for_day_to_rdv(context: SolidExecutionContext, daily_summary):
...
@pipeline
def client_appointments():
ingest_appointments_for_day_to_rdv(
fetch_appointments_for_day()
)
def runtime_config_for_client_appointment_partition(partition):
print(type(partition))
return {'solids': {'fetch_appointments_for_day': {'config': {'date': partition.value}}}}
client_appointment_partition_set = PartitionSetDefinition(
name="client_appointment_partition_set",
pipeline_name="client_appointments",
partition_fn=date_partition_range(
start=datetime(year=2021, month=4, day=1),
end=datetime.today(),
delta_range="days",
inclusive=True
),
run_config_fn_for_partition=runtime_config_for_client_appointment_partition
)
but the partition tab does not appear in the dagit console. I've tried putting the PartitionSet before and after the pipeline. Is there something obvious that I'm missing?Drew Sonne
04/22/2021, 8:43 AMMetin Senturk
04/22/2021, 11:11 AMAvri Naamani
04/22/2021, 12:41 PMget_s3_keys
function i use in my sensor.
I saw that for every sensor run, it lists all files in a bucket with specific prefix, sort them by LastModified
, then runs on every key and tries to find the file equal to the since_key
which in my case it's the context.last_run_key
.
I suspect my problem is that i deleted the files in S3 and that causes this logic to fail because the s3_key will never be equal to the since_key (line 38) .
Is that a bug or a feature? how am i suppose to handle this kind of situation?
Thanks!Steve Pletcher
04/22/2021, 1:28 PMa, b = some_solid()
, but i'm not seeing an example of something like b = some_solid().get_output('b')
, which i prefer for clarity reasonsYan
04/22/2021, 3:15 PMStepan Dvoiak
04/22/2021, 3:56 PMexport DAGSTER_HOME=~/dagster_home && dagster-daemon run
• code is identical copy from example (except MY_DIRECTORY is set to some dir)
• code for pipeline and sensor placed in one file "test_sensor.py"
• running dagit with command export DAGSTER_HOME=~/dagster_home && dagit -f sensor_test.py
• run_storage
, event_log_storage
, schedule_storage
are set to Postgres instance in ~/dagster_home/dagster.yaml
What I see:
deamon log: SensorDaemon - INFO - Not checking for any runs since no sensors have been started
No schedules or sensors defined
in dagit UI, also /instance/sensors
web page tells No sensors found
I can run pipeline successful with custom config in a playground
solids:
process_file:
config: {"filename":~/dagster_home}
Please help me debug this, where to look to find the problem?Donny Winston
04/22/2021, 5:24 PMDonny Winston
04/22/2021, 5:31 PMrun_config=preset_dev_env.run_config
to the RunRequest
inside a function decorated with @sensor(pipeline_name="my_pipeline", mode="dev")
, but it would be nice to pass e.g. preset
to the decorator.Peter B
04/23/2021, 7:22 AMSteve Pletcher
04/23/2021, 4:15 PMjasono
04/24/2021, 5:08 AMDaniel Kim
04/24/2021, 12:50 PMAndreas Williams
04/26/2021, 2:54 PMMehdi Nazari
04/26/2021, 4:34 PMBennett Kanuka
04/26/2021, 5:40 PMFileManager
but I can't get any further than that.
Could someone explain how I would do this or point to an example of a pipeline that passes file handles rather than pickled data?
Edit: this would be running on GCP in production so I would be using the dagster_gcp.gcs_file_manager
but I dont know how to use itDan Corbiani
04/26/2021, 8:56 PMMartim Passos
04/26/2021, 10:15 PMReceived unexpected config entry "config" at path root:solids:images. Expected: "{ outputs?: [{ result?: { json: { path: String } pickle: { path: String } } }] }".
when setting a .yaml file like
solids:
read_xml:
inputs:
path:
value: "src/data-in/cumulus.xml"
camera:
config: "src/data-out/camera.csv"
images:
config: "src/data-out/images.csv"
it looks exactly like the cereal tutorial so I’m confused about where the error isSlackbot
04/26/2021, 11:57 PMStepan Dvoiak
04/27/2021, 10:25 AMdagster-celery worker start -A <http://dagster_celery.app|dagster_celery.app> -y celery_worker.yaml -q queue1 -n worker1
How can I add --concurrency=10
to that command?
try to just append it to the end was not succeed:
dagster-celery worker start -A <http://dagster_celery.app|dagster_celery.app> -y celery_worker.yaml -q queue1 -n worker1 --concurrency 10
Usage: dagster-celery worker start [OPTIONS] [ADDITIONAL_ARGS]...
Try 'dagster-celery worker start --help' for help.
Error: no such option: --concurrency
I saw here that cli just forwards ADDITIONAL_ARGS to celery command at the end, but I think that when dash or double dash are used - click validation fails (no such option)
Also I cant find a tests for ADDITIONAL_ARGS in dagster_celery_tests (was looking for usage examples)
Thanks in advance!Steve Pletcher
04/27/2021, 3:09 PMconfigured
invocation is correctly configuring an object. if i call configured(foo)({'a': 'b'})
, how can i validate that the a
value in the solid config has been set to b
?bobzoller
04/27/2021, 4:01 PMrun_key
is that enough to get the sensor to trigger again (assuming it yields that runrequest again)? (trying to resolve a situation where many pipeline runs that were triggered by a sensor failed -- with bad run_config -- and I want to re-trigger them all with the fixed run_config)Makoto
04/27/2021, 8:54 PMgreat_expectation
integration, is there a particular reason why running a checkpoint is not supported? It looks like it supports running a suite. Just wanted to make sure that I didn’t miss it.