Francis Niu
09/27/2021, 2:30 AMNilesh Pandey
09/27/2021, 8:51 AMSay we have two domain X and Y. Products of domain X are [x1,x2,x3] and of Y are [y1,y2,y3]. Pipeline name will based on products of domain. Now, in normal condition we will create two repositories with name X and Y, pipelines will be created under respective repositories. If go to the Dagster UI, person from Domain X can see what is in Domain Y and vice versa. That is what we don't want to do. We want that, person from Domain X should only see pipelines related to X and not of Y and vice versa.
sourabh upadhye
09/27/2021, 9:27 AMModeDefinition("prod", resource_defs={"slack": slack_resource})
The Above mode def is not allowed since resource_defs requires us to send ResourceDefinition type. This is the same line from the example given in the documentation for success_hooks.
https://docs.dagster.io/concepts/solids-pipelines/solid-hooks#environment-specific-hooks-using-modes
ModeDefinition("prod", resource_defs={"slack": ResourceDefinition(resource_fn =example_resource)})
The above mode_def with ResourceDefinition also gives a DagsterInvalidDefinitionError
Thank you!Vladislav Ladenkov
09/27/2021, 10:21 AMAlexander Shirokov
09/27/2021, 1:48 PMchrispc
09/27/2021, 3:16 PMHebo Yang
09/27/2021, 10:41 PMCleared cursor state for sensor xxx
. However, my sensors are still getting old cursor data. Did I miss something?jeremy
09/28/2021, 1:35 AM@pipeline
def pipeline():
one, two = solid_1()
solid_4(solid_2(one))
solid_4(solid_3(two))
Can it be changed to something like:
@pipeline
def pipeline():
one, two = solid_1()
solid_4([solid_2(one), solid_3(two)])
Assuming only one of solid_2
or solid_3
will execute?Vax Thurai
09/28/2021, 4:55 AMevent_date
)
dbt_run_models = dbt_cli_run.configured(
name="dbt_run_models",
config_or_config_fn={
"project-dir": PROJECT_DIR,
"profiles-dir": PROFILES_DIR,
"models": MODELS,
"target": TARGET,
"vars": '{event_date: ' + event_date +'}'
}
)
Alexander Shirokov
09/28/2021, 11:00 AMimport dagster as d
@d.solid
def get_data():
try:
load_data()
yield d.Output(True) # Return True Output - everything loaded OK
except Exception as ex:
raise d.RetryRequested(max_retries=3)
if max_retries == 3: # If fails 3 times, than return False output
yield d.Output(False)
How can i write the last condition in dagster terminology?Rubén Lopez Lozoya
09/28/2021, 12:50 PMChris Chan
09/28/2021, 6:59 PMJazzy
09/29/2021, 7:50 AM@solid(
output_defs=[OutputDefinition(dagster_type=List)]
)
Koby Kilimnik
09/29/2021, 12:06 PMRubén Lopez Lozoya
09/29/2021, 12:06 PMexecute_pipeline
but I am not calling it that way so I don't know where to place the config object with the credentials 😞Matthew Smicker
09/29/2021, 4:11 PMGabriel Vieira
09/29/2021, 8:34 PMauto_remove:true
at file dagster.yaml
I still have multiple residual containers after its execution. Could anyone help me ? I'm using dagster version 0.12.1Alexander Vandenberg-Rodes
09/29/2021, 11:50 PMArun Kumar
09/30/2021, 11:10 AMDefaultRunLauncher
and K8sRunLauncher
for different pipelines? We currently use K8sRunLauncher
by default. However certain pipelines are very cheap (eg: just triggers an API) and can be easily executed as a process and use DefaultRunLauncher
sourabh upadhye
09/30/2021, 12:22 PMGautam B
09/30/2021, 12:23 PMsolid
we need to run a python file using subprocess.Popen
@solid
def start_prodigy(context):
process_env = {**os.environ, 'PRODIGY_LOGGING': 'verbose'}
p1 = subprocess.Popen(["prodigy", "stream_from_s3", "bucket", "customer1/", "-F", "s3_loader.py"],
env=process_env, stdout=subprocess.PIPE)
The problem is how do we upload the s3_loader.py
from our current working dir to where the Dagster will run it?Martim Passos
09/30/2021, 12:59 PMNavneet Sajwan
09/30/2021, 1:16 PMjay
09/30/2021, 4:54 PMdagit
Error log:
Exception: Timed out waiting for gRPC server to start with arguments:
/bin/python -m dagster.grpc --lazy-load-user-code --socket /var/folders/mj/97j2r6pn6d95fcbtjc5kdvrm0000gn/T/tmpm6if675b --heartbeat --heartbeat-timeout 30 --fixed-server-id 56b10928
Stack Trace:
python3.9/site-packages/dagster/core/host_representation/grpc_server_registry.py", line 177, in _get_grpc_endpoint
STEP_FAILURE - Execution of step failed.
dagster.core.errors.DagsterExecutionInterruptedError
Stack Trace:
python3.9/site-packages/dagster/core/execution/plan/execute_plan.py", line 195, in _dagster_event_sequence_for_step
I am using Dagster version 0.12.11sourabh upadhye
10/01/2021, 6:08 AM@asset_sensor(asset_key=AssetKey("my_table"), pipeline_name="my_pipeline")
def my_asset_sensor(context, asset_event):
yield RunRequest(
run_key=context.cursor,
run_config={
"solids": {
"read_materialization": {
"config": {
"asset_key": asset_event.asset_key.path,
"pipeline": asset_event.pipeline_name,
}
}
}
},
)
The above example is for asset_sensor in dagster docs. during execution an error occurs saying that 'EventLogEntry' object has no attribute 'asset_key'. What is the type for asset_event here?sourabh upadhye
10/01/2021, 6:36 AMRubén Lopez Lozoya
10/01/2021, 1:52 PMKirk Stennett
10/01/2021, 5:24 PMAn exception was thrown during execution that is likely a framework error, rather than an error in user code.
dagster.check.CheckError: Invariant failed. Description: Pipeline run cleanup (e44bf39c-e57e-4318-ac8f-1f7c62667a40) in state PipelineRunStatus.STARTED, expected NOT_STARTED or STARTING.
Any idea what would be causing this? I'm running this on a k8s instance that's comprised of spot instances and from what I've seen it looks like that might be a cause if the nodes are going down frequently. But before on 0.11.12 it was failing less. I also switched from celery_k8s_exec to just the k8s_job_exec during this time too.jay
10/01/2021, 6:18 PMrun_config
. Dagster cannot serialize the Dataframe so I converted it to JSON but then I get this error: `OverflowError: string longer than INT_MAX bytes`I tried to to compress the string using zlib but then I am getting TypeError: Object of type bytes is not JSON serializable
has anyone encountered this?Benoit Perigaud
10/02/2021, 9:53 PMdagster-daemon run
command is always eating 100% of one of my CPUs.
Here is the error in journalctl (I'm on dagster 0.12.12):
Oct 03 08:49:03 raspberrypi bash[10714]: 2021-10-03 08:49:03 - dagster-daemon - ERROR - Thread for SCHEDULER did not shut down gracefully
Oct 03 08:49:03 raspberrypi bash[10714]: Traceback (most recent call last):
Oct 03 08:49:03 raspberrypi bash[10714]: File "/home/pi/.envs/dagster/bin/dagster-daemon", line 8, in <module>
Oct 03 08:49:03 raspberrypi bash[10714]: sys.exit(main())
Oct 03 08:49:03 raspberrypi bash[10714]: File "/home/pi/.envs/dagster/lib/python3.7/site-packages/dagster/daemon/cli/__init__.py", line 135, in main
Oct 03 08:49:03 raspberrypi bash[10714]: cli(obj={}) # pylint:disable=E1123
Oct 03 08:49:03 raspberrypi bash[10714]: File "/home/pi/.envs/dagster/lib/python3.7/site-packages/click/core.py", line 829, in __call__
Oct 03 08:49:03 raspberrypi bash[10714]: return self.main(*args, **kwargs)
Oct 03 08:49:03 raspberrypi bash[10714]: File "/home/pi/.envs/dagster/lib/python3.7/site-packages/click/core.py", line 782, in main
Oct 03 08:49:03 raspberrypi bash[10714]: rv = self.invoke(ctx)
Oct 03 08:49:03 raspberrypi bash[10714]: File "/home/pi/.envs/dagster/lib/python3.7/site-packages/click/core.py", line 1259, in invoke
Oct 03 08:49:03 raspberrypi bash[10714]: return _process_result(sub_ctx.command.invoke(sub_ctx))
Oct 03 08:49:03 raspberrypi bash[10714]: File "/home/pi/.envs/dagster/lib/python3.7/site-packages/click/core.py", line 1066, in invoke
Oct 03 08:49:03 raspberrypi bash[10714]: return ctx.invoke(self.callback, **ctx.params)
Oct 03 08:49:03 raspberrypi bash[10714]: File "/home/pi/.envs/dagster/lib/python3.7/site-packages/click/core.py", line 610, in invoke
Oct 03 08:49:03 raspberrypi bash[10714]: return callback(*args, **kwargs)
Oct 03 08:49:03 raspberrypi bash[10714]: File "/home/pi/.envs/dagster/lib/python3.7/site-packages/dagster/daemon/cli/__init__.py", line 48, in run_command
Oct 03 08:49:03 raspberrypi bash[10714]: controller.check_daemon_loop()
Oct 03 08:49:03 raspberrypi bash[10714]: File "/home/pi/.envs/dagster/lib/python3.7/site-packages/dagster/daemon/controller.py", line 237, in check_daemon_loop
Oct 03 08:49:03 raspberrypi bash[10714]: self.check_daemon_heartbeats()
Oct 03 08:49:03 raspberrypi bash[10714]: File "/home/pi/.envs/dagster/lib/python3.7/site-packages/dagster/daemon/controller.py", line 212, in check_daemon_heartbeats
Oct 03 08:49:03 raspberrypi bash[10714]: failed_daemons=failed_daemons
Oct 03 08:49:03 raspberrypi bash[10714]: Exception: Stopping dagster-daemon process since the following threads are no longer sending heartbeats: ['SCHEDULER']
Oct 03 08:49:04 raspberrypi systemd[1]: dagster-daemon.service: Main process exited, code=exited, status=1/FAILURE
Oct 03 08:49:04 raspberrypi systemd[1]: dagster-daemon.service: Failed with result 'exit-code'.
Oct 03 08:49:04 raspberrypi systemd[1]: dagster-daemon.service: Service RestartSec=100ms expired, scheduling restart.
Oct 03 08:49:04 raspberrypi systemd[1]: dagster-daemon.service: Scheduled restart job, restart counter is at 6.
Oct 03 08:49:04 raspberrypi systemd[1]: Stopped Daemon for dagster.
Oct 03 08:49:04 raspberrypi systemd[1]: Started Daemon for dagster.
The heath page tells me: "Not running - No recent heartbeat"