Is it possible to run a backfill locally? If so, h...
# announcements
a
Is it possible to run a backfill locally? If so, how do I configure the run launcher in dagster.yaml?
Copy code
Error: A run launcher must be configured before running a backfill. You can configure a run launcher (e.g. dagster_graphql.launcher.RemoteDagitRunLauncher) in your instance `dagster.yaml` settings. See <https://dagster.readthedocs.io/en/latest/sections/deploying/instance.html> for moreinformation.
m
ah, we don't have an example of the run launcher configured in that link
Copy code
run_launcher:
  module: dagster_graphql.launcher
  class: RemoteDagitRunLauncher
  config:
    address: <http://localhost:3333>
a
So one option is to set up a “dagit” run launcher that sends runs to a running dagit
Copy code
run_launcher:
  module: dagster.core.launcher.remote
  class: RemoteDagitRunLauncher
  config:
    address: <http://localhost:3333>
If you go down this path you may want to prevent dagit from running them all at the same time by putting this also in your dagster.yaml
Copy code
dagit:
  execution_manager:
    max_concurrent_runs: <some number>
a
cool, thanks guys
a
Another path would be to write your own
RunLauncher
if you are feeling zesty. Its a very simple interface https://github.com/dagster-io/dagster/blob/fb85c9f7b4f99bcebcfbc3591fae68d4c4271434/python_modules/dagster/dagster/core/launcher/__init__.py#L6
the very base impl is
Copy code
def launch_run(self, instance, run):
          pipeline = define_repository().get_pipeline(run.pipeline_name)
        instance.create_run(run)
        [ev for ev in execute_run_iterator(pipeline, run, instance)]
      return run
you just need to decide if you want that code to happen in subprocess / whatever
a
btw guys, the help message for dagster pipeline backfill says
dagster pipeline backfill log_daily_stats --from 20191101
but if I try to use that date format it throws this error:
Error: invalid value 20200216 for from
m
@sashank @prha ^^
a
is there any way to configure concurrent runs at the pipeline level? some of my pipelines will need to run on short schedules like every 10 minutes, and its very likely that their runtime will ocasionally be longer than that window
a
We have
should_execute
on
ScheduleDefinition
that you could use
a
got a traceback trying to run the backfill now
Copy code
Traceback (most recent call last):
  File "/root/.local/share/virtualenvs/dagster-test-ajKJ0Bj2/bin/dagster", line 8, in <module>
    sys.exit(main())
  File "/root/.local/share/virtualenvs/dagster-test-ajKJ0Bj2/lib/python3.6/site-packages/dagster/cli/__init__.py", line 38, in main
    cli(obj={})  # pylint:disable=E1123
  File "/root/.local/share/virtualenvs/dagster-test-ajKJ0Bj2/lib/python3.6/site-packages/click/core.py", line 764, in __call__
    return self.main(*args, **kwargs)
  File "/root/.local/share/virtualenvs/dagster-test-ajKJ0Bj2/lib/python3.6/site-packages/click/core.py", line 717, in main
    rv = self.invoke(ctx)
  File "/root/.local/share/virtualenvs/dagster-test-ajKJ0Bj2/lib/python3.6/site-packages/click/core.py", line 1137, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/root/.local/share/virtualenvs/dagster-test-ajKJ0Bj2/lib/python3.6/site-packages/click/core.py", line 1137, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/root/.local/share/virtualenvs/dagster-test-ajKJ0Bj2/lib/python3.6/site-packages/click/core.py", line 956, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/root/.local/share/virtualenvs/dagster-test-ajKJ0Bj2/lib/python3.6/site-packages/click/core.py", line 555, in invoke
    return callback(*args, **kwargs)
  File "/root/.local/share/virtualenvs/dagster-test-ajKJ0Bj2/lib/python3.6/site-packages/dagster/cli/pipeline.py", line 506, in pipeline_backfill_command
    execute_backfill_command(kwargs, click.echo)
  File "/root/.local/share/virtualenvs/dagster-test-ajKJ0Bj2/lib/python3.6/site-packages/dagster/cli/pipeline.py", line 595, in execute_backfill_command
    instance.launch_run(run)
  File "/root/.local/share/virtualenvs/dagster-test-ajKJ0Bj2/lib/python3.6/site-packages/dagster/core/instance/__init__.py", line 451, in launch_run
    return self._run_launcher.launch_run(self, run)
  File "/root/.local/share/virtualenvs/dagster-test-ajKJ0Bj2/lib/python3.6/site-packages/dagster_graphql/launcher/__init__.py", line 79, in launch_run
    self.validate()
  File "/root/.local/share/virtualenvs/dagster-test-ajKJ0Bj2/lib/python3.6/site-packages/dagster_graphql/launcher/__init__.py", line 74, in validate
    host=self._address
dagster.core.errors.DagsterLaunchFailedError: Host <http://localhost:3333> failed sanity check. It is not a dagit server.
a
Do you have
dagit
running locally on port
3333
?
a
hmm, should it be the same port as the webUI? cos mine is running on 3000
a
ya so just update the config to point
3000
instead of
3333
(in
dagster.yaml
)
a
ok, and thanks for being patient with me 😛
a
no worries! - you are helping flesh out some stuff out we haven’t had much user feedback on so this is incredibly valuable
p
@Auster Cid can you paste me the partition function you’re using for the pipeline you’re trying to backfill?
The
invalid value 20200216 for from
error just means that it cannot find a partition defined with that name
a
Copy code
import datetime

from dateutil.relativedelta import relativedelta

from dagster import PartitionSetDefinition, repository_partitions
from dagster.utils.partitions import date_partition_range

def environment_dict_fn_for_canceled_flights(partition):
    date = partition.value
    base_url = r"<https://some.url/action?start={}&end={}>"
    return {
        'execution': {
            'in_process': None
        },
        'solids': {
            'download_file': {
                'config': {
                    'url': base_url.format(date.strftime('%Y-%m-%d'),date.strftime('%Y-%m-%d')),
                    'file_path': 'canceled_flights.csv'
                }
            },
            'file_handle_to_s3': {
                'config': {
                    'Bucket': 'some-bucket',
                    'Key': 'canceled_flights/{}_canceled_flights.csv'.format(date.strftime('%Y-%m-%d'))
                }
            }
        },
        'resources': {
            's3': {
                'config': {
                    'region_name': "us-west-1"
                }
            }
        }
    }


canceled_flights_dates = PartitionSetDefinition(
    name="canceled_flights_dates",
    pipeline_name="upload_canceled_flights_to_s3",
    partition_fn=date_partition_range(
        start=datetime.datetime.today() - relativedelta(years=1),
        end=datetime.datetime.today() + relativedelta(days=1),
        delta=relativedelta(days=1),
    ),
    environment_dict_fn_for_partition=environment_dict_fn_for_canceled_flights,
)


@repository_partitions
def define_partitions():
    return [canceled_flights_dates]
I think this is being defined by
date_partition_range
, yes?
s
Hey @Auster Cid, the date syntax you want to use is
%Y-%m-%d
. So your command would be:
Copy code
dagster pipeline backfill log_daily_stats --from 2019-11-01
Btw, you can run just
dagster pipeline backfill
to interactively select a pipeline and see what partitions are available