Yasin Zaehringer
05/24/2021, 11:01 AMenableSubchart
to decouple the deployment of dagster from deploying the pipelines but I haven't found an example yet. Can the user deployment be in different namespaces?
• One has to secure dagit, the standard approach here seems to be via network policies, i.e. one needs calico
• Using Hashicorp Vault to inject secrets looks not straightforward. We use Vault's kubernetes authenticator which nicely ties the secrets to the namespace+pod identity. There might be a possibility to recreate this setup with dagster since every user deployment/repository can have their own service account, but I don't see any examples out there.
• Can I use Vault to manage the postgres/S3 secrets?
• Does one have to secure the communication (e.g. gRPC) between the different dagster services using credentials/TLS/network policies?
• Anything I forgot?
It would be amazing if somebody who went down this route could give their thoughts around this endeavour!
- Best, YasinSeth Miller
05/24/2021, 10:51 PMpaul.q
05/25/2021, 6:58 AM@solid(
tags={"friendly": "extract tickets"},
It's more appropriate to tag them at the pipeline level via:
var_a = solid_a.tag({"friendly": "extract tickets"})(...)
but this becomes rather verbose rather quickly, esp if we nest our dependency chain to some degree.
I was hoping I could provide tags via the run_config through something like:
solids:
solid_a:
tags:
friendly: "extract_tickets"
But this isn't allowed. Is this just a deficiency in run_config synthesis or have I got something fundamentally wrong?Dalibor Novak
05/25/2021, 11:30 AMdagster.core.errors.DagsterInvalidDefinitionError: Resource "adls2" is required by solid def store_on_azure, but is not provided by mode "default".
I think I am adding the resource to the default mode by doing
@pipeline(mode_defs=[ModeDefinition(name="default", resource_defs={"adsl2": adls2_resource})])
Should I be doing something else instead?Matt Callaway
05/25/2021, 8:31 PMEduardo Santizo
05/26/2021, 1:28 AMgrpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "DNS resolution failed for service: pipelines:4000" debug_error_string = "{"created":"@1621991631.236838909","description":"Resolver transient failure","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":1360,"referenced_errors":[{"created":"@1621991631.236837716","description":"DNS resolution failed for service: pipelines:4000","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc","file_line":361,"grpc_status":14,"referenced_errors":[{"created":"@1621991631.236821297","description":"C-ares status is not ARES_SUCCESS qtype=A name=pipelines is_balancer=0: Domain name not found","file":"src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc","file_line":724}]}]}" >
How do I modify the workspace.yaml
file to get the daemon and dagit to connect to the gRPC server?VxD
05/26/2021, 4:39 AMB → C
↗ ↘
A F
↘ ↗
D → X
Now imagine the A, B, C and D solids were successful, but "X" has failed for whatever reason, preventing the pipeline from completing.
I am looking to programmatically trigger a re-run of X in the same pipeline context (with the same run id), potentially with (user-editable) different arguments. If it passes this time, I would expect the F solid to be started, and the pipeline to be eventually marked as successful.
Is there a way to achieve this? The documentation describes how to re-execute a full pipeline, but what I am looking for is a way to selectively re-run the solids that have failed and have the existing pipeline pick up from there, to save valuable computation resources and allow our users to track the result of the originally-started run.
Thanks in advance!Gabriel Simmons
05/26/2021, 6:32 AMIOManager
for the output of a composite solid? A toy version of my scenario below. I would like to be able to use func_b
as part of several composite solids, and I don’t know that I always want to use my_custom_io_manager
with func_b
. I do know that I always want to use my_custom_io_manager
with the composite solid func_b_a
. With the setup below, running func_b_a_pipeline
does not use my_custom_io_manager
and uses the default in-memory manager instead. When I move the OutputDefinition
to the decorator for func_b
, the custom io manager is used, but this means that I’m always using this io manager whenever I use func_b, which is not desirable for me. I’m new to Dagster, any ideas on workarounds are appreciated.
@solid
def func_a() -> ReturnsSomething:
...
@solid
def func_b(input) -> AlsoReturnsSomething:
...
@composite_solid(
output_defs=[OutputDefinition(io_manager_key="key_for_my_custom_io_manager")]
)
def func_b_a():
return func_b(func_a())
@pipeline
def func_b_a_pipeline():
func_b_a():
Rubén Lopez Lozoya
05/26/2021, 8:18 AMmrdavidlaing
05/26/2021, 11:54 AMEventMetadataEntry.md(df.head(5).to_markdown(), "head(5)")
This used to render as a table in Dagit in the Logs and in the Asset viewer; but now just renders an unprocessed markdown.
Is this intended behaviour?Jonathan Mak
05/26/2021, 12:56 PMgit pull
to refresh your dbt models before doing dbt run
? Do you manage Dagster in a repo separate to dbt?Denis Rakhimov
05/26/2021, 6:22 PMArun Kumar
05/26/2021, 7:45 PMWebSocket connection to '<wss://dagit.doorcrawl.com/graphql>' failed:
in the dev console of the UI. I learned that our infra team does not currently support websocket connections yet. My question is does the entire UI depends on websocket connections or only the live view? Currently I am not able to even see the pipelines on dagitOliver
05/27/2021, 3:02 AMlogger_config = {
'loggers': {
'json': {"config": {"log_level":"DEBUG"}}
}
}
all of the dagster engine events are coming through correctly, but anything im manually logging does not show up
i.e a line like this inside a solid
<http://context.log.info|context.log.info>("index", total=len(records))
the logs are, however, showing up in dagit
any hints appreciated! thanksszalai1
05/27/2021, 9:36 AMszalai1
05/27/2021, 9:37 AMget_output_asset_key
,but when I implemented it I got this error:
Both the OutputDefinition and the IOManager of output "result" on solid "pull_tweets" associate it with an asset. Either remove the asset_key parameter on the OutputDefinition or use an IOManager that does not specify an AssetKey in its get_output_asset_key() function.
Willie Mccoy
05/27/2021, 10:11 AMMartim Passos
05/27/2021, 2:18 PMRunRequests
for more than one pipeline and
2. Define a solid_selection
with logic inside the sensor (all solids if file does not exist, merge only if it exists), rather than passing it as an argument to the sensor definition?
or do I need to define an extra pipeline that does all this?Laura Moraes
05/27/2021, 3:27 PM2021-05-27 15:26:00 - SensorDaemon - ERROR - Sensor daemon caught an error for sensor ftps_sensor : grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.DEADLINE_EXCEEDED
details = "Deadline Exceeded"
debug_error_string = "{"created":"@1622129159.300676954","description":"Error received from peer ipv4:172.18.0.2:4000","file":"src/core/lib/surface/call.cc","file_line":1066,"grpc_message":"Deadline Exceeded","grpc_status":4}"
>
Stack Trace:
File "/usr/local/lib/python3.8/site-packages/dagster/daemon/sensor.py", line 224, in execute_sensor_iteration
yield from _evaluate_sensor(
File "/usr/local/lib/python3.8/site-packages/dagster/daemon/sensor.py", line 254, in _evaluate_sensor
sensor_runtime_data = repo_location.get_external_sensor_execution_data(
File "/usr/local/lib/python3.8/site-packages/dagster/core/host_representation/repository_location.py", line 699, in get_external_sensor_execution_data
return sync_get_external_sensor_execution_data_grpc(
File "/usr/local/lib/python3.8/site-packages/dagster/api/snapshot_sensor.py", line 40, in sync_get_external_sensor_execution_data_grpc
api_client.external_sensor_execution(
File "/usr/local/lib/python3.8/site-packages/dagster/grpc/client.py", line 288, in external_sensor_execution
chunks = list(
File "/usr/local/lib/python3.8/site-packages/dagster/grpc/client.py", line 97, in _streaming_query
yield from response_stream
File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 426, in __next__
return self._next()
File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 826, in _next
raise self
My sensor connects to a FTP and iterates through the folders. Do you think the error could be that it takes too long to finish the scan?Martim Passos
05/27/2021, 5:13 PM2021-05-27 13:55:14 - dagster-daemon - INFO - instance is configured with the following daemons: ['BackfillDaemon', 'SchedulerDaemon', 'SensorDaemon']
2021-05-27 13:55:14 - SensorDaemon - INFO - Not checking for any runs since no sensors have been started.
2021-05-27 13:55:14 - SchedulerDaemon - INFO - Not checking for any runs since no schedules have been started.
2021-05-27 13:55:14 - BackfillDaemon - INFO - No backfill jobs requested.
and Dagit says no daemons are running. I have two sensors and a schedule that show fine in the UI and are turned on.Jean-Pierre M
05/27/2021, 8:04 PMdagster.check.CheckError: Failure condition: Unexpected dynamic output dependency in regular fan in, should have been caught at definition time.
File "/disk/users/jpmiren/envs/dagster/lib/python3.7/site-packages/dagster/grpc/impl.py", line 349, in get_external_execution_plan_snapshot
known_state=args.known_state,
File "/disk/users/jpmiren/envs/dagster/lib/python3.7/site-packages/dagster/core/execution/api.py", line 740, in create_execution_plan
known_state=known_state,
File "/disk/users/jpmiren/envs/dagster/lib/python3.7/site-packages/dagster/core/execution/plan/plan.py", line 736, in build
return plan_builder.build()
File "/disk/users/jpmiren/envs/dagster/lib/python3.7/site-packages/dagster/core/execution/plan/plan.py", line 162, in build
pipeline_def.dependency_structure,
File "/disk/users/jpmiren/envs/dagster/lib/python3.7/site-packages/dagster/core/execution/plan/plan.py", line 308, in _build_from_sorted_solids
parent_step_inputs=step_inputs,
File "/disk/users/jpmiren/envs/dagster/lib/python3.7/site-packages/dagster/core/execution/plan/plan.py", line 308, in _build_from_sorted_solids
parent_step_inputs=step_inputs,
File "/disk/users/jpmiren/envs/dagster/lib/python3.7/site-packages/dagster/core/execution/plan/plan.py", line 308, in _build_from_sorted_solids
parent_step_inputs=step_inputs,
File "/disk/users/jpmiren/envs/dagster/lib/python3.7/site-packages/dagster/core/execution/plan/plan.py", line 223, in _build_from_sorted_solids
parent_step_inputs,
File "/disk/users/jpmiren/envs/dagster/lib/python3.7/site-packages/dagster/core/execution/plan/plan.py", line 412, in get_step_input_source
"Unexpected dynamic output dependency in regular fan in, "
File "/disk/users/jpmiren/envs/dagster/lib/python3.7/site-packages/dagster/check/__init__.py", line 116, in failed
raise CheckError("Failure condition: {desc}".format(desc=desc))
Dan Corbiani
05/27/2021, 8:07 PMfrom dagster.core.storage.tags import MEMOIZED_RUN_TAG
@solid(version="qwerty", output_defs=[OutputDefinition(io_manager_key="io_manager")], required_resource_keys={"pyspark"})
def basic_data_generation(context):
spark_session = context.resources.pyspark.spark_session
pdf = pd.DataFrame({"val": range(100)})
df = spark_session.createDataFrame(pdf)
return df
@solid(version="asdf", input_defs=[InputDefinition("df", DSparkDataFrame)], output_defs=[OutputDefinition(io_manager_key="io_manager")], config_schema={"rows": int})
def filter_data(context: SolidExecutionContext, df: SparkDataFrame):
num_rows = int(context.solid_config["rows"])
df = df.limit(num_rows)
return df
@pipeline(mode_defs=[local_mode],
tags={MEMOIZED_RUN_TAG: "true"})
def custom_data_w_mode():
population_data_df = basic_data_generation()
hashed_data_df = filter_data(population_data_df)
try:
hash_pipeline_result = execute_pipeline(custom_data_w_mode, run_config={'solids': {'filter_data': {'config': {'rows': 5}}}}, mode="local")
except:
...
hash_df = hash_pipeline_result.output_for_solid("filter_data", "result")
display(hash_df)
Jonathan Mak
05/28/2021, 5:18 AMwangm23456
05/28/2021, 7:04 AMwangm23456
05/28/2021, 7:05 AMwangm23456
05/28/2021, 9:32 AMError 1: Invalid scalar at path root:max_concurrent_runs. Value "{'env': 'MAX_RUNS'}" of type "<class 'dict'>" is not valid for expected type "Int".
wangm23456
05/28/2021, 9:33 AMwangm23456
05/28/2021, 9:35 AM# as always, some or all of these values can be obtained from environment variables:
wangm23456
06/01/2021, 7:13 AMwangm23456
06/01/2021, 7:15 AM