Auster Cid
03/04/2020, 10:02 PMAuster Cid
03/04/2020, 10:07 PMError: A run launcher must be configured before running a backfill. You can configure a run launcher (e.g. dagster_graphql.launcher.RemoteDagitRunLauncher) in your instance `dagster.yaml` settings. See <https://dagster.readthedocs.io/en/latest/sections/deploying/instance.html> for moreinformation.
max
03/04/2020, 10:09 PMmax
03/04/2020, 10:09 PMrun_launcher:
module: dagster_graphql.launcher
class: RemoteDagitRunLauncher
config:
address: <http://localhost:3333>
alex
03/04/2020, 10:10 PMrun_launcher:
module: dagster.core.launcher.remote
class: RemoteDagitRunLauncher
config:
address: <http://localhost:3333>
If you go down this path you may want to prevent dagit from running them all at the same time by putting this also in your dagster.yaml
dagit:
execution_manager:
max_concurrent_runs: <some number>
Auster Cid
03/04/2020, 10:14 PMalex
03/04/2020, 10:15 PMRunLauncher
if you are feeling zesty. Its a very simple interface https://github.com/dagster-io/dagster/blob/fb85c9f7b4f99bcebcfbc3591fae68d4c4271434/python_modules/dagster/dagster/core/launcher/__init__.py#L6alex
03/04/2020, 10:18 PMdef launch_run(self, instance, run):
pipeline = define_repository().get_pipeline(run.pipeline_name)
instance.create_run(run)
[ev for ev in execute_run_iterator(pipeline, run, instance)]
return run
you just need to decide if you want that code to happen in subprocess / whateverAuster Cid
03/04/2020, 10:23 PMdagster pipeline backfill log_daily_stats --from 20191101
but if I try to use that date format it throws this error:
Error: invalid value 20200216 for from
max
03/04/2020, 10:24 PMAuster Cid
03/04/2020, 10:28 PMalex
03/04/2020, 10:32 PMshould_execute
on ScheduleDefinition
that you could useAuster Cid
03/04/2020, 10:33 PMAuster Cid
03/04/2020, 10:33 PMTraceback (most recent call last):
File "/root/.local/share/virtualenvs/dagster-test-ajKJ0Bj2/bin/dagster", line 8, in <module>
sys.exit(main())
File "/root/.local/share/virtualenvs/dagster-test-ajKJ0Bj2/lib/python3.6/site-packages/dagster/cli/__init__.py", line 38, in main
cli(obj={}) # pylint:disable=E1123
File "/root/.local/share/virtualenvs/dagster-test-ajKJ0Bj2/lib/python3.6/site-packages/click/core.py", line 764, in __call__
return self.main(*args, **kwargs)
File "/root/.local/share/virtualenvs/dagster-test-ajKJ0Bj2/lib/python3.6/site-packages/click/core.py", line 717, in main
rv = self.invoke(ctx)
File "/root/.local/share/virtualenvs/dagster-test-ajKJ0Bj2/lib/python3.6/site-packages/click/core.py", line 1137, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/root/.local/share/virtualenvs/dagster-test-ajKJ0Bj2/lib/python3.6/site-packages/click/core.py", line 1137, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/root/.local/share/virtualenvs/dagster-test-ajKJ0Bj2/lib/python3.6/site-packages/click/core.py", line 956, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/root/.local/share/virtualenvs/dagster-test-ajKJ0Bj2/lib/python3.6/site-packages/click/core.py", line 555, in invoke
return callback(*args, **kwargs)
File "/root/.local/share/virtualenvs/dagster-test-ajKJ0Bj2/lib/python3.6/site-packages/dagster/cli/pipeline.py", line 506, in pipeline_backfill_command
execute_backfill_command(kwargs, click.echo)
File "/root/.local/share/virtualenvs/dagster-test-ajKJ0Bj2/lib/python3.6/site-packages/dagster/cli/pipeline.py", line 595, in execute_backfill_command
instance.launch_run(run)
File "/root/.local/share/virtualenvs/dagster-test-ajKJ0Bj2/lib/python3.6/site-packages/dagster/core/instance/__init__.py", line 451, in launch_run
return self._run_launcher.launch_run(self, run)
File "/root/.local/share/virtualenvs/dagster-test-ajKJ0Bj2/lib/python3.6/site-packages/dagster_graphql/launcher/__init__.py", line 79, in launch_run
self.validate()
File "/root/.local/share/virtualenvs/dagster-test-ajKJ0Bj2/lib/python3.6/site-packages/dagster_graphql/launcher/__init__.py", line 74, in validate
host=self._address
dagster.core.errors.DagsterLaunchFailedError: Host <http://localhost:3333> failed sanity check. It is not a dagit server.
alex
03/04/2020, 10:33 PMdagit
running locally on port 3333
?Auster Cid
03/04/2020, 10:34 PMalex
03/04/2020, 10:34 PM3000
instead of 3333
(in dagster.yaml
)Auster Cid
03/04/2020, 10:35 PMalex
03/04/2020, 10:35 PMprha
03/04/2020, 10:42 PMprha
03/04/2020, 10:43 PMinvalid value 20200216 for from
error just means that it cannot find a partition defined with that nameAuster Cid
03/05/2020, 1:16 PMimport datetime
from dateutil.relativedelta import relativedelta
from dagster import PartitionSetDefinition, repository_partitions
from dagster.utils.partitions import date_partition_range
def environment_dict_fn_for_canceled_flights(partition):
date = partition.value
base_url = r"<https://some.url/action?start={}&end={}>"
return {
'execution': {
'in_process': None
},
'solids': {
'download_file': {
'config': {
'url': base_url.format(date.strftime('%Y-%m-%d'),date.strftime('%Y-%m-%d')),
'file_path': 'canceled_flights.csv'
}
},
'file_handle_to_s3': {
'config': {
'Bucket': 'some-bucket',
'Key': 'canceled_flights/{}_canceled_flights.csv'.format(date.strftime('%Y-%m-%d'))
}
}
},
'resources': {
's3': {
'config': {
'region_name': "us-west-1"
}
}
}
}
canceled_flights_dates = PartitionSetDefinition(
name="canceled_flights_dates",
pipeline_name="upload_canceled_flights_to_s3",
partition_fn=date_partition_range(
start=datetime.datetime.today() - relativedelta(years=1),
end=datetime.datetime.today() + relativedelta(days=1),
delta=relativedelta(days=1),
),
environment_dict_fn_for_partition=environment_dict_fn_for_canceled_flights,
)
@repository_partitions
def define_partitions():
return [canceled_flights_dates]
Auster Cid
03/05/2020, 1:16 PMdate_partition_range
, yes?sashank
03/06/2020, 8:21 PM%Y-%m-%d
.
So your command would be:
dagster pipeline backfill log_daily_stats --from 2019-11-01
sashank
03/06/2020, 8:22 PMdagster pipeline backfill
to interactively select a pipeline and see what partitions are available