Hi, I have dagster deployed and running on version...
# announcements
b
Hi, I have dagster deployed and running on version
0.10.4
. Everything runs fine, except the scheduler seems to continually shut down after about 2-3 hours with the following error (pasted below). It seems that I have to restart the daemon continually to address this. is this normal? is there a way to suppress these errors? I'm invoking
daagster-daemonn
via
supervisord
with the simple
run
commannd.
Copy code
dagster.serdes.ipc.DagsterIPCProtocolError: Timeout: read stream has not received any data in 15 seconds
  File "/usr/local/lib/python3.8/site-packages/dagster/scheduler/scheduler.py", line 86, in launch_scheduled_runs
    with RepositoryLocationHandle.create_from_repository_location_origin(
  File "/usr/local/lib/python3.8/site-packages/dagster/core/host_representation/handle.py", line 57, in create_from_repository_location_origin
    return ManagedGrpcPythonEnvRepositoryLocationHandle(repo_location_origin)
  File "/usr/local/lib/python3.8/site-packages/dagster/core/host_representation/handle.py", line 192, in __init__
    self.grpc_server_process = GrpcServerProcess(
  File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 1037, in __init__
    self.server_process = open_server_process(
  File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 942, in open_server_process
    wait_for_grpc_server(server_process, output_file)
  File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 878, in wait_for_grpc_server
    event = read_unary_response(ipc_output_file, timeout=timeout, ipc_process=server_process)
  File "/usr/local/lib/python3.8/site-packages/dagster/serdes/ipc.py", line 39, in read_unary_response
    messages = list(ipc_read_event_stream(output_file, timeout=timeout, ipc_process=ipc_process))
  File "/usr/local/lib/python3.8/site-packages/dagster/serdes/ipc.py", line 152, in ipc_read_event_stream
    raise DagsterIPCProtocolError(
It's kind of odd, it actually seems like the scheduler is still running but the UI makes it seem like it has stopped:
any insight here? this error seems to happen with every deploy. it almost seems as if theres if theres an intermittent connnection timeout, the process just fails and then that's it, your scheduler is toast.
d
Hi, is it possible to post logs from a period of time just before this error as well? I think something might be causing a process to not be able to start up earlier.
b
sure, i can attempt to do so. as I said, it seems to happen after a couple of hours of uptime
and then the
Status
window is seemingly stuck like this
even though runs are still being triggered..
here are the logs
Copy code
2021-02-10 00:41:32 - SchedulerDaemon - INFO - Checking for new runs for the following schedules: dbt_run_all, mysql_drupal_to_psql_warehouse_all_else, mysql_drupal_to_psql_warehouse_commerce_fields, mysql_drupal_to_psql_warehouse_commerce_core, mysql_drupal_to_psql_warehouse_ioby_sf, mysql_drupal_to_psql_warehouse_commerce_donations, mysql_drupal_to_psql_warehouse_match_programs, mysql_drupal_to_psql_warehouse_node, mysql_drupal_to_psql_warehouse_people, mysql_drupal_to_psql_warehouse_projects, mysql_drupal_to_psql_warehouse_revisions
ioby-data | 2021-02-09 19:41:38 2021-02-10 00:41:37 - dagster - INFO - system - 5ae2890f-3653-4763-bbf2-89f4f196936e - copy_mysql_drupal_tables_to_psql_warehouse - WRITING MYSQL ioby_sf_opportunities TO tmp_ioby_data_pipelines_etl_mysql_drupal_to_psql_warehouse_1326a.ioby_sf_opportunities IN WAREHOUSE
ioby-data | 2021-02-09 19:41:48 2021-02-10 00:41:48 - SchedulerDaemon - ERROR - Scheduler failed for dbt_run_all : dagster.serdes.ipc.DagsterIPCProtocolError: Timeout: read stream has not received any data in 15 seconds
ioby-data | 2021-02-09 19:41:48 
ioby-data | 2021-02-09 19:41:48 Stack Trace:
ioby-data | 2021-02-09 19:41:48   File "/usr/local/lib/python3.8/site-packages/dagster/scheduler/scheduler.py", line 86, in launch_scheduled_runs
ioby-data | 2021-02-09 19:41:48     with RepositoryLocationHandle.create_from_repository_location_origin(
ioby-data | 2021-02-09 19:41:48   File "/usr/local/lib/python3.8/site-packages/dagster/core/host_representation/handle.py", line 57, in create_from_repository_location_origin
ioby-data | 2021-02-09 19:41:48     return ManagedGrpcPythonEnvRepositoryLocationHandle(repo_location_origin)
ioby-data | 2021-02-09 19:41:48   File "/usr/local/lib/python3.8/site-packages/dagster/core/host_representation/handle.py", line 192, in __init__
ioby-data | 2021-02-09 19:41:48     self.grpc_server_process = GrpcServerProcess(
ioby-data | 2021-02-09 19:41:48   File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 1037, in __init__
ioby-data | 2021-02-09 19:41:48     self.server_process = open_server_process(
ioby-data | 2021-02-09 19:41:48   File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 942, in open_server_process
ioby-data | 2021-02-09 19:41:48     wait_for_grpc_server(server_process, output_file)
ioby-data | 2021-02-09 19:41:48   File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 878, in wait_for_grpc_server
ioby-data | 2021-02-09 19:41:48     event = read_unary_response(ipc_output_file, timeout=timeout, ipc_process=server_process)
ioby-data | 2021-02-09 19:41:48   File "/usr/local/lib/python3.8/site-packages/dagster/serdes/ipc.py", line 39, in read_unary_response
ioby-data | 2021-02-09 19:41:48     messages = list(ipc_read_event_stream(output_file, timeout=timeout, ipc_process=ipc_process))
ioby-data | 2021-02-09 19:41:48   File "/usr/local/lib/python3.8/site-packages/dagster/serdes/ipc.py", line 152, in ipc_read_event_stream
ioby-data | 2021-02-09 19:41:48     raise DagsterIPCProtocolError(
ioby-data | 2021-02-09 19:41:48
i guess this is also the combined logs of
dagit
+
dagster-daemonn
, since im running both via
supervisord
d
It’s also possible that the error is indicative of a larger problem (e.g. an out of memory error?) because I wouldn’t expect that error alone to shut down the whole scheduler process - there’s a catch around that codepath and it would normally try again a few seconds later
b
it seems like it does try again, but that dagit gets stuck in an error state
jobs are still actively running but the UI looks like this:
refreshing the repo doesn't do anything
it has the same error
d
I’ll be able to take a closer look at this in an hour or two. In the meantime if it’s possible to check if you’re close to any memory limits when this is happening, that would help rule things out
b
it doesn't look like i am (its deployed via digital ocean app platform), but maybe there are some secondary limits i'm not aware of.
the CPU is high... i'm running everything on a single node without celery etc, so jobs are executed on the same instance that dagit is running on
thanks for your help!
d
Refreshing the repo giving the same error is a very useful clue - that means something about your current system state is making it impossible to launch the process that serves the repository information (in both dagit and the daemon) - that’s what leads me to believe it’s some kind of memory or other resource issue. Are there any useful errors or other logs in the command line output of your dagit process when you try to refresh the repo and it fails?
b
just that timeout error.
I also get this warning:
OpenBLAS WARNING - could not determine the L2 cache size on this system, assuming 256k
d
I wonder if it could be so CPU capped that it’s taking a really long time to spin up a server and hits the timeout?
If there’s any way to temporarily bring the CPU usage down a bit and see if the problem persists, that would help rule that out
b
will try now.
okay i scaled up the server and redeployed... from googling that
OpenBLAS
warning seems to be associated with OOM errors in other cases, but cant be sure since some things continue to run?
i left it run overnight, oddly 1/2 of the jobs still got triggered by the scheduler even though it as in an "error" state according to dagit.
d
Some jobs starting and some jobs failing is consistent with the node being overloaded - processes are probably sporadically getting shut down as they run out of resources
b
okay, yeah. runs seem to be triggering normally now
they're also going much quicker, another sign that the CPU being pinned at 100 was the root issue
im assuming best practice is to isolate the scheduler in its own container for this reason.
okay, to summarize: both dagit and dagster-daemon communicate with your dagster code via an RPC process which, when your node is overwhelmed, can take a long time to respond or startup and throws this error
dagster.serdes.ipc.DagsterIPCProtocolError: Timeout: read stream has not received any data in 15 seconds
?
d
Right, they both spin up subprocesses to load your code (unless you have set up your own gRPC server to do it and specified that in your workspace.yaml). The error message here could definitely be clearer though. And yeah, putting the scheduler and Dagit in their own containers separately from the CPU-bound workers would likely help with this.
b
I suppose I might also implement the
QueuedRunCoordinator
to ensure that there are never be too many jobs running at once?
actually, it happened again. even with 2X the CPU.
d
That would also likely help, yeah - if you're running into issues when launching a bunch of runs
b
so basically if the CPU ever hits 100% on the node that the scheduler is running on youre toast?
it would be preferable, i think, for the scheduler to exit with an error when it clearly cant do its job. at least that way i could dynamically restart it using something like supervisor. but right now i basically have to redeploy the entire image because it gets stuck in this error state.
d
Hm, I'd need more information to make that conclusion (re: CPU ever hitting 100%). Certainly if your system is in a state where spinning up a new process takes more than 15 seconds, Dagster is going to run into trouble. I like the idea of adding some monitoring to try to identify that the node is getting overloaded though - and agree we probably shouldn't just keep retrying forever if something fundamental like being unable to spin up a subprocess keeps failing repeatedly.
b
im still struggling with this... i was really hoping to avoid having to do a multi-container deploy but even when i implemeted the
QueuedRunCoordinator
, i still got CPU spikes and the scheduler failed as before. im now seeing whether maybe supervisor is the culprit. if not that, i may just try the CronScheduling option...
d
Is there any way to see which process the spikes are coming from? I'm not sure the cron scheduler is going to be any better unless the spikes are specifically caused by the daemon somehow - the cron scheduler also spins up a subprocess.
b
the error seems to be ocurring now even when there aren't cpu spikes
Copy code
Timeout: read stream has not received any data in 15 seconds
ioby-data | 2021-02-10 17:33:15 
ioby-data | 2021-02-10 17:33:15 Stack Trace:
ioby-data | 2021-02-10 17:33:15   File "/usr/local/lib/python3.8/site-packages/dagster/scheduler/scheduler.py", line 86, in launch_scheduled_runs
ioby-data | 2021-02-10 17:33:15     with RepositoryLocationHandle.create_from_repository_location_origin(
ioby-data | 2021-02-10 17:33:15   File "/usr/local/lib/python3.8/site-packages/dagster/core/host_representation/handle.py", line 57, in create_from_repository_location_origin
ioby-data | 2021-02-10 17:33:15     return ManagedGrpcPythonEnvRepositoryLocationHandle(repo_location_origin)
ioby-data | 2021-02-10 17:33:15   File "/usr/local/lib/python3.8/site-packages/dagster/core/host_representation/handle.py", line 192, in __init__
ioby-data | 2021-02-10 17:33:15     self.grpc_server_process = GrpcServerProcess(
ioby-data | 2021-02-10 17:33:15   File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 1037, in __init__
ioby-data | 2021-02-10 17:33:15     self.server_process = open_server_process(
ioby-data | 2021-02-10 17:33:15   File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 942, in open_server_process
ioby-data | 2021-02-10 17:33:15     wait_for_grpc_server(server_process, output_file)
ioby-data | 2021-02-10 17:33:15   File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 878, in wait_for_grpc_server
ioby-data | 2021-02-10 17:33:15     event = read_unary_response(ipc_output_file, timeout=timeout, ipc_process=server_process)
ioby-data | 2021-02-10 17:33:15   File "/usr/local/lib/python3.8/site-packages/dagster/serdes/ipc.py", line 39, in read_unary_response
ioby-data | 2021-02-10 17:33:15     messages = list(ipc_read_event_stream(output_file, timeout=timeout, ipc_process=ipc_process))
ioby-data | 2021-02-10 17:33:15   File "/usr/local/lib/python3.8/site-packages/dagster/serdes/ipc.py", line 152, in ipc_read_event_stream
ioby-data | 2021-02-10 17:33:15     raise DagsterIPCProtocolError(
ioby-data | 2021-02-10 17:33:15
d
Got it. And no memory pressure? Just asking because you earlier mentioned the other non-Dagster log output that you saw was associated with OOM issues
b
no, memory pressure no
the queued run coordinator is throwing the same error:
Copy code
6 2021-02-10 22:35:16 - dagster-daemon - ERROR - Caught error in DaemonType.QUEUED_RUN_COORDINATOR:
ioby-data | 2021-02-10 17:35:16 SerializableErrorInfo(message='dagster.serdes.ipc.DagsterIPCProtocolError: Timeout: read stream has not received any data in 15 seconds\n', stack=['  File "/usr/local/lib/python3.8/site-packages/dagster/daemon/controller.py", line 117, in run_iteration\n    error_info = check.opt_inst(next(generator), SerializableErrorInfo)\n', '  File "/usr/local/lib/python3.8/site-packages/dagster/daemon/run_coordinator/queued_run_coordinator_daemon.py", line 172, in run_iteration\n    self._dequeue_run(run, location_manager)\n', '  File "/usr/local/lib/python3.8/site-packages/dagster/daemon/run_coordinator/queued_run_coordinator_daemon.py", line 206, in _dequeue_run\n    external_pipeline = location_manager.get_external_pipeline_from_run(run)\n', '  File "/usr/local/lib/python3.8/site-packages/dagster/daemon/run_coordinator/queued_run_coordinator_daemon.py", line 97, in get_external_pipeline_from_run\n    ] = RepositoryLocationHandle.create_from_repository_location_origin(\n', '  File "/usr/local/lib/python3.8/site-packages/dagster/core/host_representation/handle.py", line 57, in create_from_repository_location_origin\n    return ManagedGrpcPythonEnvRepositoryLocationHandle(repo_location_origin)\n', '  File "/usr/local/lib/python3.8/site-packages/dagster/core/host_representation/handle.py", line 192, in __init__\n    self.grpc_server_process = GrpcServerProcess(\n', '  File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 1037, in __init__\n    self.server_process = open_server_process(\n', '  File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 942, in open_server_process\n    wait_for_grpc_server(server_process, output_file)\n', '  File "/usr/local/lib/python3.8/site-packages/dagster/grpc/server.py", line 878, in wait_for_grpc_server\n    event = read_unary_response(ipc_output_file, timeout=timeout, ipc_process=server_process)\n', '  File "/usr/local/lib/python3.8/site-packages/dagster/serdes/ipc.py", line 39, in read_unary_response\n    messages = list(ipc_read_event_stream(output_file, timeout=timeout, ipc_process=ipc_process))\n', '  File "/usr/local/lib/python3.8/site-packages/dagster/serdes/ipc.py", line 152, in ipc_read_event_stream\n    raise DagsterIPCProtocolError(\n'], cls_name='DagsterIPCProtocolError', cause=None)
dagit can still access the repo though, so im cconfused why the daemon cannot.
d
If you refresh the repo in dagit, does it run into the same error?
b
nope
d
but yesterday it was right?
b
yeah, actually now it is down. it seemed to take a while.
interestingly this time there was no cpu spike ... no jobs were even running.
the scheduler just ran for awhile and then shut downn and then eveything else shut down.
d
Yeah, everything we're seeing so far is consistent with the system being overloaded enough that processes are randomly getting shut down and/or unable to start. But if it's not CPU and it's not memory (and presumably not disk space)... are other non-dagster processes struggling too when this is happening? The process that dagster is trying to spin up is a pretty lightweight gRPC server, and it's failing right away, before it loads any non-dagster code... so if that's failing i'd expect lots of other processes to be struggling as well.
is there anything that has reliably fixed the issue so far?
Or maybe if you run 'ps aux' is there anything surprising, like lots of hanging python processes, anything like that?
b
no. the only reliable thing is that it fails. it doesn't happen when i run the docker container so it is probably some undocumented limit in digital ocean, maybe there is some limit on the amount of ram a given process can consume and its silently killing things.
d
that would make sense - I'm not aware of any other reports of these symptoms, so could definitely be something unique about the execution environment. Sorry for the frustration :(
b
do you know of any users that run single-node setups? is that actually advised?
the output from
htop
on the node seems to indicate that all dagster-related
python
processes are running at 800% CPU
d
hm, that's no good. Let me see if that's something we can reproduce on our side.
hmm, are there any non-dagster processes running on the node, and are they running at more reasonable CPU levels? I noticed that the htop process is also at 800%, which I wouldn't expect to be the case
b
the only other things that are running are
supervisor
and
nginx
and those are running at 0% CPU
d
got it - is getting profiling information from py-spy an option here?
b
i can certainly try!
so i'd point it at the pid of of the grpc proccesses? eg:
py-spy top --pid 12345
?
d
honestly they're all pretty mysterious to me, but dagit and dagster-daemon are the most mysterious since those aren't even running user code
but the grpc ones would also be interesting
b
okay give me a minute. i'll try to get it setup
d
thanks! Very curious to see what could be maxing out dagit CPU..
b
maybe this is my lack of understandinng of htop but why are there so many entries for each process?
d
I'm not an htop expert either, but if you're using the default run launcher those could be subprocesses? The gRPC server spawns a subprocess to carry out each launched runs. That's the most likely explanation for the ones with identical arguments
Dagit and the daemon will also have their own gRPC process for each of the repository locations (this is a place where we could do more to optimize for single-node deployments)
b
heres the output from ps aux which is different
Copy code
USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
root         1  0.0  0.3  13540  7032 ?        Ss   16:03   0:00 /bin/bash /opt/dagster/ops/entrypoint.sh
root      1088  6.4  9.5 1301460 199884 ?      Sl   16:06   0:15 /usr/local/bin/python -m dagster.grpc --socket /tmp/tmpasi8
root        11  0.4  1.2  34632 25512 ?        S    16:03   0:01 /usr/bin/python2 /usr/bin/supervisord -c /etc/supervisor/co
root      1162  0.1  0.6  21480 14076 ?        S    16:06   0:00 /usr/local/bin/python -c from multiprocessing.resource_trac
root      1163  107 25.8 1165684 542872 ?      Rl   16:06   4:04 /usr/local/bin/python -c from multiprocessing.spawn import 
root      1261  0.2  0.2  12132  4404 ?        S    16:07   0:00 tail -F -c +0 /opt/dagster/storage/4551b2d6-bc6f-4965-b587-
root      1262  0.2  0.5  19060 11648 ?        S    16:07   0:00 /usr/local/bin/python /usr/local/lib/python3.8/site-package
root      1263  0.2  0.2  12132  4528 ?        S    16:07   0:00 tail -F -c +0 /opt/dagster/storage/4551b2d6-bc6f-4965-b587-
root      1265  0.3  0.6  19060 12624 ?        S    16:07   0:00 /usr/local/bin/python /usr/local/lib/python3.8/site-package
root        14  0.2  3.1 118892 65436 ?        S    16:03   0:00 nginx: master process /usr/sbin/nginx -g daemon off;
root        15  0.0  0.2  13540  6040 ?        S    16:03   0:00 /bin/bash ops/start-dagit.sh
root      1503  0.2  0.2  13804  5852 ?        Ss   16:07   0:00 bash
root        16  0.0  0.3  13540  6912 ?        S    16:03   0:00 /bin/bash ops/start-dagster-daemon.sh
root        17 10.1  6.0 683080 126244 ?       Sl   16:03   0:41 /usr/local/bin/python /usr/local/bin/dagit -h 0.0.0.0 -p 30
root        18  9.4  4.9 624816 104088 ?       Sl   16:03   0:38 /usr/local/bin/python /usr/local/bin/dagster-daemon run
root        19  0.0  2.2 119236 47208 ?        S    16:03   0:00 nginx: worker process
root        20  0.0  2.2 119236 47208 ?        S    16:03   0:00 nginx: worker process
root        21  0.0  2.2 119236 47208 ?        S    16:03   0:00 nginx: worker process
root        22  0.0  2.2 119236 47208 ?        S    16:03   0:00 nginx: worker process
root        23  0.0  2.2 119236 47208 ?        S    16:03   0:00 nginx: worker process
root      2351  2.3  0.3  13804  7396 ?        Ss   16:10   0:00 bash
root        24  0.0  2.2 119236 47148 ?        S    16:03   0:00 nginx: worker process
root        25  0.1  2.2 119236 47208 ?        S    16:03   0:00 nginx: worker process
root        26  0.0  2.2 119236 47144 ?        S    16:03   0:00 nginx: worker process
root      2660  0.0  0.0      0     0 ?        Z    16:10   0:19 [python] <defunct>
root      2853  0.0  9.4 1312028 198292 ?      Sl   16:11   0:12 /usr/local/bin/python -m dagster.grpc --socket /tmp/tmpd6mm
root      2926  0.0  0.3  17444  7448 ?        R    16:11   0:00 ps aux
root        35  7.6 10.0 1407660 210488 ?      Sl   16:04   0:30 /usr/local/bin/python -m dagster.grpc --socket /tmp/tmph9_l
root        48  0.0  0.3  13804  6944 ?        Ss   16:04   0:00 bash
root       725 14.6  0.2  13196  4676 ?        R    16:05   0:42 htop
that's showing the culprit to be:
/usr/local/bin/python -c from multiprocessing.spawn import
d
ah that makes more sense than every process at 800
b
for some reason
py-spy
can't find these PIDs, it returns
Error: No such file or directory (os error 2)
it seems that the PIDs are continually changing which might be consistent with what you described: a process attempting to spawn, failing to allocate the necessary cpu/ram, and then continually retrying
d
that would fit, yeah
b
i can reproduce the CPU spikes in my local docker instance but i don't think it causes the outage like it does on app platform. this is the process
Copy code
root      8924 46.5 11.8 861680 241936 ?       Sl   16:16   2:32 /usr/local/bin/python -c from multiprocessing.spawn import spawn_main; spawn_main(tracker_fd=14, pipe_handle=16) --multiprocessing-fork
thats also continually changing its PID as well, though.
d
A short spike when it's first starting up isn't totally unexpected
b
yeah. as i said before, it seems to run fine on my local docker host.
got this response from digital ocean support, do you think this could be related? my dags dont write anything to tmpfile, but i know dagster does. i dont really pass a lot of data betweenn dags either
Copy code
Could you please mention what would be size of temporary files that Dagster writes to disk. There is a 2GiB limit on ephemeral disk storage within the App container.  If it's filling up the temporary files the App might be restarted.
could it just be that logs are clogging up the disk? i thought those were written to postgres
that would be in line with the pattern of it taking 1-2 hours (basically a few jobs need to run) before it shuts down. i do have my log level set to
INFO
and print out a lot of information.
d
Is it possible to share your instance config? This could potentially be inputs/outputs from solids if they're going to the filesystem, depending on how that is set up
b
this is the pipeline code. i only pass a list of tables between each solid
Copy code
from dagster import pipeline, solid, Output, OutputDefinition, InputDefinition

from ioby_data import modes, solids
from ioby_data.utils import sql

PIPELINE = __name__.replace(".", "_").lower()


@solid(
    required_resource_keys={"mysql_drupal"},
    config_schema={
        "from_schema": str,
        "exclude": list,
        "include": list,
    },
    output_defs=[
        OutputDefinition(
            dagster_type=dict,
            name="table_info",
            description="Information about the mysql tables selected",
        )
    ],
)
def get_mysql_drupal_tables_to_sync(context):
    """
    Fetch a list of table names inside a schema, optionally excluding some
    """

    # build exclude table clauses
    exclude = context.solid_config["exclude"]
    if len(exclude):
        exclude_table_clauses = "\n AND ".join(
            [f"table_name NOT LIKE '{exc}'" for exc in exclude if exc.strip() != ""]
        )
    else:
        exclude_table_clauses = "1=1"

    # build include table clauses
    include = context.solid_config["include"]
    if len(include):
        include_table_clauses = "\n OR ".join(
            [
                f"table_name LIKE '{inc}'"
                for inc in context.solid_config["include"]
                if inc.strip() != ""
            ]
        )
    else:
        include_table_clauses = "1=1"

    sql = f"""
        SELECT 
            table_name,
            table_rows
        FROM 
            information_schema.tables 
        WHERE
            table_rows > 0
            AND table_schema='{context.solid_config['from_schema']}'
            AND {exclude_table_clauses}
            AND (
                {include_table_clauses}
            )
                
        ORDER BY RAND()
    """
    <http://context.log.info|context.log.info>(f"Running query: {sql}")
    df = context.resources.mysql_drupal.df_from_query(sql)
    mysql_tables = {
        row.table_name: {"num_rows": row.table_rows, "part": (idx % 10) + 1}
        for idx, row in df.iterrows()
    }
    <http://context.log.info|context.log.info>(f"RETRIEVED {len(mysql_tables.keys())} MYSQL TABLES")
    table_info = {
        "from_schema": context.solid_config["from_schema"],
        "tables": mysql_tables,
    }
    yield Output(table_info, "table_info")


@solid(
    required_resource_keys={"psql_warehouse", "mysql_drupal"},
    config_schema={
        "limit": int,
    },
    input_defs=[
        InputDefinition(
            dagster_type=dict,
            name="table_info",
            description="Information about the mysql tables selected",
        )
    ],
    output_defs=[
        OutputDefinition(
            dagster_type=list,
            name="tables",
            description="A list of temp tables created in the warehouse",
        )
    ],
)
def copy_mysql_drupal_tables_to_psql_warehouse(context, table_info):

    # setup warehouse access
    wh = context.resources.psql_warehouse
    mysql = context.resources.mysql_drupal

    # setup schema to copy the table to
    from_schema = table_info["from_schema"]
    input_tables = table_info["tables"]
    tmp_schema = sql.gen_temp_schema(PIPELINE)
    <http://context.log.info|context.log.info>(f"CREATING TMP SCHEMA: {tmp_schema}")
    wh.create_schema(tmp_schema)

    # copy the tables
    output_tables = []

    for table, table_info in input_tables.items():
        <http://context.log.info|context.log.info>(f"FETCHING SCHEMA FOR {table}")
        column_schema = mysql.get_table_column_schema(table)
        create_table_stmt = wh.create_table(table, tmp_schema, column_schema)
        <http://context.log.info|context.log.info>(f"SUCCESSFULLY RAN IN WAREHOUSE:\n{create_table_stmt}")
        <http://context.log.info|context.log.info>(f"LOADING MYSQL TABLE {table} INTO WAREHOUSE")
        # export mysql table to csv
        limit_stmt = ""
        if context.solid_config["limit"] > 0:
            limit_stmt = f"LIMIT {context.solid_config['limit']}"
        rows = mysql.execute(
            f"""
        SELECT * FROM {table} {limit_stmt}
        """
        )
        <http://context.log.info|context.log.info>(f"WRITING MYSQL {table} TO {tmp_schema}.{table} IN WAREHOUSE")
        wh.insert_rows_to_table(rows, table, tmp_schema)
        <http://context.log.info|context.log.info>(f"FINISHED LOADING TABLE {table}")
        output_tables.append(f"{tmp_schema}.{table}")

    yield Output(output_tables, "tables")


@solid(
    required_resource_keys={"psql_warehouse"},
    config_schema={
        "to_schema": str,
    },
    input_defs=[
        InputDefinition(
            dagster_type=list,
            name="tables",
            description="A list of temp tables created in the warehouse",
        )
    ],
)
def replace_existing_tables_with_new_tables(context, tables):

    # setup warehouse access
    wh = context.resources.psql_warehouse
    dest_schema = context.solid_config["to_schema"]
    for src_table in tables:

        # format src/ dest table names and schema
        src_schema, table_name = src_table.split(".")
        dest_table = f"{dest_schema}.{table_name}"
        <http://context.log.info|context.log.info>(f"Replacing {dest_table} with {src_table}")
        # swap table in a single transaction
        wh.swap_table(src_table, dest_table)

    # drop the temp schema for this operation.
    wh.drop_schema(src_schema)


@pipeline(mode_defs=[modes.DEFAULT])
def mysql_drupal_to_psql_warehouse():
    table_info = get_mysql_drupal_tables_to_sync()
    tables = copy_mysql_drupal_tables_to_psql_warehouse(table_info)
    replace_existing_tables_with_new_tables(tables)
and my
dagster.yaml
Copy code
# ==================================================================================================
# Run Storage
# ==================================================================================================
# Controls how the history of runs is persisted. Can be set to SqliteRunStorage (default) or
# PostgresRunStorage.
run_storage:
  module: dagster_postgres.run_storage
  class: PostgresRunStorage
  config:
    postgres_db:
      username:
        env: IOBY_DAGSTER_DB_USERNAME
      password:
        env: IOBY_DAGSTER_DB_PASSWORD
      hostname:
        env: IOBY_DAGSTER_DB_HOST
      db_name:
        env: IOBY_DAGSTER_DB_NAME
      port:
        env: IOBY_DAGSTER_DB_PORT

# ==================================================================================================
# Event Log Storage
# ==================================================================================================
# Controls how the structured event logs produced by each run are persisted. Can be set to
# SqliteEventLogStorage (default) or PostgresEventLogStorage.
event_log_storage:
  module: dagster_postgres.event_log
  class: PostgresEventLogStorage
  config:
    postgres_db:
      username:
        env: IOBY_DAGSTER_DB_USERNAME
      password:
        env: IOBY_DAGSTER_DB_PASSWORD
      hostname:
        env: IOBY_DAGSTER_DB_HOST
      db_name:
        env: IOBY_DAGSTER_DB_NAME
      port:
        env: IOBY_DAGSTER_DB_PORT

# ==================================================================================================
# Scheduler
# ==================================================================================================
# Provides an optional scheduler which controls execution of pipeline runs at regular intervals.
# We recommend using the default DagsterDaemonScheduler - SystemCronScheduler and K8sScheduler are
# also available but are deprecated.
scheduler:
  module: dagster.core.scheduler
  class: DagsterDaemonScheduler

# ==================================================================================================
# Schedule Storage
# ==================================================================================================
# Controls the backing storage used by the scheduler to manage the state of schedules and persist
# records of attempts.
schedule_storage:
  module: dagster_postgres.schedule_storage
  class: PostgresScheduleStorage
  config:
    postgres_db:
      username:
        env: IOBY_DAGSTER_DB_USERNAME
      password:
        env: IOBY_DAGSTER_DB_PASSWORD
      hostname:
        env: IOBY_DAGSTER_DB_HOST
      db_name:
        env: IOBY_DAGSTER_DB_NAME
      port:
        env: IOBY_DAGSTER_DB_PORT

# ==================================================================================================
# Run Launcher
# ==================================================================================================
# Component that determines where runs are executed.
run_launcher:
  module: dagster.core.launcher
  class: DefaultRunLauncher

# ==================================================================================================
# Run Coordinator
# ==================================================================================================
# Determines the policy used to determine the prioritization rules and concurrency limits for runs.
# Can be set to DefaultRunCoordinator (default) or QueuedRunCoordinator when you want to maintain
# limits on the number of runs that can be executing at once.
run_coordinator:
  module: dagster.core.run_coordinator
  class: QueuedRunCoordinator
  config:
    max_concurrent_runs: 2

telemetry:
  enabled: false
to be completely honest. i can't tell if its really effecting the production env. basically the CPU spikes when thee job runs, dagit reports that the dameon and repository are inaccessible, the job finishes, and then eventually the status page shows everything being healthy again.
setting limits on the number of concurrent jobs seemed to help but i don't feel particularly confident in its continued reliability.