Somasundaram Sekar
06/24/2021, 6:13 PMdagster-daemon run
,
I have a pipeline with a schedule
@schedule(cron_schedule="* * * * *", pipeline_name="my_pipeline", execution_timezone="XXXXX", mode="local")
and I run dagit
and dagster-daemon run
, but even after the schedule is turned off and the dagit
is killed the scheduler still executes the pipeline once a minute.
I'm honestly lost on any logical explanation of what is happeningSerge Smertin
06/24/2021, 7:35 PM@solid
def just_run(_):
<http://_.log.info|_.log.info>(f'Just run....')
@pipeline(mode_defs=[ModeDefinition(
resource_defs={'io_manager': fs_io_manager},
executor_defs=[multiprocess_executor])])
def dummy():
just_run()
@daily_schedule(
pipeline_name=dummy.__name__,
start_date=datetime.datetime(2021, 6, 24),
execution_time=datetime.time(21, 32),
should_execute=lambda _: True,
# should_execute=lambda _: datetime.today().weekday() < 5,
execution_timezone='Europe/Amsterdam')
def daily(date):
print(f'Daily stuff... {date}')
return {}
latest tick is always skipped and daemon is telling
2021-06-24 21:32:29 - SchedulerDaemon - INFO - Evaluating schedule `daily` at 2021-06-24 21:32:00+0200
2021-06-24 21:32:29 - SchedulerDaemon - INFO - No run requests returned for daily, skipping
what is wrong?...Sarah Gruskin
06/24/2021, 9:16 PMfrom dagster import HookContext, failure_hook
@failure_hook
def my_failure_hook(context: HookContext):
solid_exception: BaseException = context.solid_exception
# print stack trace of exception
traceback.print_tb(solid_exception.__traceback__)
@failure_hook(required_resource_keys={"slack"})
def slack_message_on_failure(context: HookContext):
message = f"Solid {context.solid.name} failed"
context.resources.slack.chat.post_message(channel="#de-alerts", text=message)
and init file like this
...
@pipeline(
mode_defs=[
ModeDefinition(
name="prod",
resource_defs={
"redacted": redacted,
"api_call": api_call,
"slack": slack_resource
}
),
ModeDefinition(
name="unit_test",
resource_defs={
"redacted": redacted,
"api_call": aapi_call_test,
"slack": ResourceDefinition.hardcoded_resource(
slack_resource_mock,
"do not send messages in dev"
)
}
)
],
tags={
'dagster-k8s/config': {
'pod_template_spec_metadata': {
'labels': {'my_labels': 'redacted}
}
}
}
)
def my_pipeline():
solid_one(
solid_two(
solid_three(
fetch_from_redacted()
)
)
)
@daily_schedule(
pipeline_name="my_pipeline",
start_date=datetime(2021, 4, 1),
execution_time=time(hour=8, minute=15),
execution_timezone="UTC",
)
def my_schedule(date):
return {
"solids": {
"fetch_from_redacted": {
"config": {
"date": date.strftime("%Y-%m-%d")
}
}
}
}
@slack_message_on_failure
@pipeline(mode_defs=mode_defs)
def notif_all():
# the hook "slack_message_on_failure" is applied on every solid instance within this pipeline
solid_one()
solid_two()
fetch_from_redacted()
solid_3()
workspace.yaml
# Yaml for loading our single repository from our repos.py file:
load_from:
- python_file: repos.py
resources:
slack:
config:
token: "secret"
the resources section doesn't seem to be correct since I'm receiving the following error:
"slack": slack_resource
docker_example_pipelines | NameError: name 'slack_resource' is not defined
I'm unsure how all this ties together.Daniel Michaelis
06/25/2021, 8:52 AM@solid(input*defs=[InputDefinition("data", dagster_type=str, root_manager_key="my_root")])
I haven't encountered the usage of the asterisk as part of an input variable name as in input*defs
and I couldn't find what it does. In the rest of the code I've only seen it written as input_defs
. Is this just a typo or could someone clarify which function the asterisk has here?Giacomo D
06/25/2021, 12:44 PMCaio Rogério Silva dos Santos
06/25/2021, 1:13 PMdate
context variable that it passes for the solids, so as to updated some table properly. Problem is, I am unsure about that when running backfills, is the context.solid_config['date']
the partition date? If not, how can I access the partition date when running backfills but not break my code that's supposed to run daily? Thanks in advance!Vince
06/25/2021, 2:06 PMjurelou
06/25/2021, 2:50 PMis_required=False
parameter
@solid(
output_defs=[
OutputDefinition(name="a", is_required=False),
OutputDefinition(name="b", is_required=False),
OutputDefinition(name="c", is_required=False)
]
)
def generate_branches(file_path: str):
if 1 == 1:
yield(None, output_name="a")
else:
yield(None, output_name="b")
@solid
def proc_file(file_path):
branches = generate_branches(file_path=file_path)
@pipeline()
def my_pipeline():
input_files = gather_files()
output_files = input_files.map(proc_file)
end(output_files.collect())
Jenny Webster
06/25/2021, 5:28 PMdagster.core.errors.DagsterInvalidDefinitionError: Input "df" in solid "generate_speed_solid" is not connected to the output of a previous solid and can not be loaded from configuration, creating an impossible to execute pipeline. Possible solutions are:
E * add a dagster_type_loader for the type "DataFrame"
E * connect "df" to the output of another solid
where the “generate_speed_solid” is one of the standard solids inside the composite solid. I have tested the individual solids and they work in a pipeline outside of the composite solid.
We had been using type casting in our function definitions instead of explicitly calling InputDefinition and OutputDefinition in the solid wrappers. I can’t seem to find the documentation about how this needs to change. Can someone point me to the right resource? Thanks!Josh Lloyd
06/25/2021, 6:02 PMdagster api grpc -h 0.0.0.0 -p 4000 --python-file dagster_repos.py
but that just causes Dagit confusion that causes it to constantly switch between the new pipeline file and the old one. Seems like I need to stop the first grpc process first.Vitor Baptista
06/26/2021, 2:43 PMyield AssetMaterialization(asset_key="gmail_messages_ids", partition=partition_date_str)
and the Output
. How can I access the AssetMaterialization's partition
field inside the IOManager? I need this field to ensure that if I run the solid twice for the same partition date, it won't create duplicated rows in the table, but will update the data from the last time it ran on the same partition
.Vitor Baptista
06/26/2021, 2:52 PMfor i, message in enumerate(messages):
yield Output(message, output_name=f"download_messages_ids_{i}"
and the error is:
dagster.core.errors.DagsterInvariantViolationError: Core compute for solid "download_messages_ids_in_day" returned an output "download_messages_ids_0" that does not exist. The available outputs are ['result']
I suppose I could create multiple optional outputs like download_message_ids_0
to 100, but that seems too hackish. Is there a better way?Jonathan Mak
06/28/2021, 12:05 PMDbtResult
and NodeResult
? It seems like `NodeResult`’s error
, fail
, skip
and status
do not respond to whether the dbt node’s actual run result. Whether it failed, errored or succeeded, these values are all null. Can anyone confirm if this is a bug or not?Scott Peters
06/28/2021, 4:20 PMdagster client
( maybe the graphql client could be this? ), for interacting with dagster ( submit, get output ) without having direct local access to dagit
? ( currently, the graphql client
is fairly limited <https://docs.dagster.io/concepts/dagit/graphql-client>
)
2. (related) ... what is the appropriate way to get output from a pipeline to other non-dagster python tools... ( aka - pipeline creates an s3 bucket that another script needs to send that s3 path back to the user )Scott Peters
06/28/2021, 4:25 PMdagster
for our asset management and validation pipelines, but, several of our users are artists, and therefor will require a more didactic front end than dagit
( simple buttons and forms ) for submitting jobs and getting feedback / results. It seems that once you are in the dagster
eco-system, it is fairly insular?
Any perspective is appreciatedAndrew Parsons
06/28/2021, 6:32 PMsegmentizer
is simply a spacy.language.Language
Here are some relevant excerpts from my configuration:
playground yaml:
execution:
multiprocess:
config:
max_concurrent: 14
dagster.yaml:
run_launcher:
module: dagster.core.launcher
class: DefaultRunLauncher
Any ideas? Thanks in advance!Sarah Gruskin
06/28/2021, 8:52 PMdaily_schedule
decorator. I'm running it locally currently and kicked off a backfill. The issue is it's been stuck in the 'Starting...' state and I don't know how to debug why. I've downloaded the logs from the 3 dots but unsure what I should be looking for. What would cause a pipeline to hang like this?Scott Peters
06/28/2021, 10:29 PMdagster
for the last year or so. As I started to look into it, he mentioned that it is great as long as you are doing all the execution on a single node, but that scaling to multiple compute nodes gets infinitely more complex. Can someone briefly explain how to run dagster pipelines
across multiple nodes ( async or otherwise )Scott Peters
06/29/2021, 1:33 AMgraphql
examples from the docs and getting an error:Scott Peters
06/29/2021, 1:33 AMSimon Späti
06/29/2021, 8:43 AMAlex
06/29/2021, 11:49 AMXiaosu
06/29/2021, 1:23 PMMichel Rouly
06/29/2021, 3:07 PMMessage: Attempted to deserialize class "EventLogEntry" which is not in the whitelist. This error can occur due to version skew, verify processes are running expected versions.
Michel Rouly
06/29/2021, 4:21 PMa
and b
that feed into solid fan_in
, and b
fails, I'd still like fan_in
to run with b
excluded from its input array.orenl
06/29/2021, 6:24 PMdagster-k8s/config
of a pipeline? My pipeline need to handle inputs of varying sizes, and I'm looking for a way to dynamically set the resources based on the input size.mrdavidlaing
06/29/2021, 6:31 PMexecute_solid()
I'm finding that resources required by the solid are initialised twice - once during the execute_solid()
call and then again when I grab the result of the execution with result.output_value()
.
Is this expected behaviour? If yes; can I get the output_value without triggering a re-init of the resources?
Details & logs in the 🧵orenl
06/29/2021, 7:08 PM@solid(config_schema={})
def consume_all_mem(context):
a = []
while True:
a.append(' ' * 10**6)
When I run it in a pipeline, the underlying k8s job shows BackoffLimitExceeded
warning, and the pod's status is "OOMKilled with exit code 137".
Ideally, I'd like to see that the job failed because it ran out of memory, and create a sensor to re-submits it with higher resource allocation.Michel Rouly
06/29/2021, 7:27 PMrun_config
)?
e.g., if I tweak one parameter, it won't impact the other solids, but would turn this halfway solid from failure to success on retry?adamd
06/29/2021, 7:33 PM