<@UDJ0NL1LY> still getting resource unavailable ``...
# announcements
s
@max still getting resource unavailable
Copy code
compute_logs:
  module: dagster.core.storage.local_compute_log_manager
  class: NoOpComputeLogManager
  config:
    base_dir: /tmp/datafarm
a
hmm based on the stack trace it looks like the config updated wasn’t respected how is the instance grabbed in
datafarm/utils/pipeline_partition_runner.py
and what is the lifecylce of the process calling it?
s
instance = DagsterInstance.get()
with
DAGSTER_HOME
set
a
messages are pickled to disk and the pipeline is called
how is the pipeline called?
like is it happening in a new process, thread, in line?
s
single thread/in line
a
hmm ok - and did the process that spawns the thread restart when you deployed the new config?
if could share a code snippet for how the thread for pipeline execution is spun up that would be useful
s
like this?
Copy code
ingestion_pipeline = ExecutionTargetHandle.for_pipeline_module(
    'datafarm.pipelines',
    pipeline_config.name,
).build_pipeline_definition()

environment_dict = pipeline_config.pipeline_partition.environment_dict_for_partition(partition)
tags = pipeline_config.pipeline_partition.tags_for_partition(partition)
instance = DagsterInstance.get()

<http://logger.info|logger.info>(f'{pipeline_config.short_name} Started')

execute_pipeline(
    ingestion_pipeline,
    environment_dict=environment_dict,
    tags=tags,
    mode=EP_ENV,
    instance=instance,
)
a
i mean the code that sets up the thread where i assume this code above is invoked
for context - we had an issue like this internally due to a dictionary of the
thread
objects that we were failing to delete and since we held that reference all of the files in the thread were held open indefinitely
s
Copy code
while self._keep_running:
    with self._statsd.timer('extract_kafka_messages'):
        start = time.time()
        kafka_messages = list()
        while (
                len(kafka_messages) <= max_messages_per_run
                and
                time.time() - start < consumer_poll_duration_ms / 1000
                and
                self._keep_running
        ):
            message_set = consumer.poll(
                timeout_ms=poll_timeout_ms,
            )
            for _, messages in message_set.items():
                if not self._keep_running:
                    # dont get more messages if
                    # shutdown signal received
                    break

                for message in messages:
                    kafka_messages.append(message)

    if not self._keep_running:
        # dont run pipeline if shutdown signal received
        break

    if kafka_messages:
        self.run_pipeline(kafka_messages)
        with self._statsd.timer('commit_kafka_messages'):
            consumer.commit()
Copy code
def run_pipeline(self, kafka_messages):
        with open(self._save_path, 'wb') as f:
            pickle.dump(kafka_messages, f)

        PipelinePartitionRunner.run_pipeline(
            partition=self._partition,
            pipeline_config=self._pipeline_config,
            logger=self._logger,
            sentry=self._sentry,
            statsd=self._statsd,
        )
I dont use any threads
the process is invoked via python and then runs in line
m
and the instance is retrieved when you call
PipelinePartitionRunner.run_pipeline
?
s
yes
should i only be instantiating that once?
a
shouldnt matter. it is odd that it didnt seem to pick up the new config
the process is invoked via python
what is the deploy set up? do you have ssh access to the machine?
s
yup
a process starts the consumer script and it's long living
a
can you verify that only one copy of this process is running on the machine (assuming thats expected) and then look at the open file descriptors for the pid
ls /proc/$pid/fd
s
Copy code
ls /proc/112763/fd
0    104  110  117  123  13   136  142  149  155  161  168  174  180  187  193  2    205  211  218  224  24  30  37  43  5   56  62  69  75  81  88  94
1    105  111  118  124  130  137  143  15   156  162  169  175  181  188  194  20   206  212  219  225  25  31  38  44  50  57  63  7   76  82  89  95
10   106  112  119  125  131  138  144  150  157  163  17   176  182  189  195  200  207  213  22   226  26  32  39  45  51  58  64  70  77  83  9   96
100  107  113  12   126  132  139  145  151  158  164  170  177  183  19   196  201  208  214  220  227  27  33  4   46  52  59  65  71  78  84  90  97
101  108  114  120  127  133  14   146  152  159  165  171  178  184  190  197  202  209  215  221  228  28  34  40  47  53  6   66  72  79  85  91  98
102  109  115  121  128  134  140  147  153  16   166  172  179  185  191  198  203  21   216  222  229  29  35  41  48  54  60  67  73  8   86  92  99
103  11   116  122  129  135  141  148  154  160  167  173  18   186  192  199  204  210  217  223  23   3   36  42  49  55  61  68  74  80  87  93
a
lol thats not useful - uhhh
lsof -p 112763
s
pretty big
a
that seems reasonable - no red flags there
well there does seem to be a lot of open sockets
<http://batch22sj.prod.easypo.net:50112->klogs1sj.prod.easypo.net:40172|batch22sj.prod.easypo.net:50112->klogs1sj.prod.easypo.net:40172>
s
one thing to note, yesterday i was getting both the
BlockingIO
and the
can't start new thread error
now i'm only getting the
BlockingIO
error
a
but 455 shouldnt get us close to the limit of 1024
s
speak of the devil and it shall appear, just got the
can't start new thread error
a
whats the trace for the new thread error?
nevermind found it in other message
👍 1
I still don’t understand why the NoOpComputeManager doesn’t seem to be being used
worth double checking
$DAGSTER_HOME
you should have
storage
and
history
directories in there full of stuff if its working as expected
s
Copy code
echo $DAGSTER_HOME
/srv/datafarm
and
dagster.yaml
is in
/srv/datafarm
would the storage and history be the
local_artifact_storage
?
a
what else do you have configured in the
dagster.yaml
?
s
a
ah you re direct to
/tmp/datform
but that directory is full of stuff
s
i have to cause we're only allowed to write to
/tmp
in prod
a
ya thats cool
alright what else can we check
pstree
s
but there's no
storage
or
history
in
/tmp/datafarm
a
pstree -p 112763
pstree 112763
s
looks like that one got restarted
Copy code
pstree -p 92629
ingest(92629)─┬─stdin2epilog(92635)
              ├─stdin2epilog(92636)
              └─{ingest}(92658)
a
pstree -s python
s
what is
-s
trying to do?
Copy code
pstree -s python
pstree: invalid option -- 's'
Usage: pstree [ -a ] [ -c ] [ -h | -H PID ] [ -l ] [ -n ] [ -p ] [ -u ]
              [ -A | -G | -U ] [ PID | USER ]
       pstree -V
Display a tree of processes.

    -a     show command line arguments
    -A     use ASCII line drawing characters
    -c     don't compact identical subtrees
    -h     highlight current process and its ancestors
    -H PID highlight this process and its ancestors
    -G     use VT100 line drawing characters
    -l     don't truncate long lines
    -n     sort output by PID
    -p     show PIDs; implies -c
    -u     show uid transitions
    -U     use UTF-8 (Unicode) line drawing characters
    -V     display version information
    -Z     show SELinux security contexts
    PID    start at this PID; default is 1 (init)
    USER   show only trees rooted at processes of this user
a
man alright im running out of ideas
i guess it could be if you are only allowed to write to
/tmp
there could be other strict restrictions on number of threads per process or number of file descriptors
given the code you’ve shared - it looks like the whole program will stop when execute_pipeline throws these errors?
s
yes, they bubble to the top
why is the
NoOpComputeLogManager
trying to start new threads/subprocesses ?
a
it shouldn’t be
i have no idea how you are getting to dagster/core/storage/compute_log_manager.py, line 57
it should bail at line 52
s
and we know the
dagster.yaml
file is being picked up properly as it is writing the runs to /tmp/datafarm
a
ya that and i assume you are seeing new runs and stuff in the database you are pointing at
s
yup, its chugging along great for the most part. guess i'll just pass on these two exceptions and retry ¯\_(ツ)_/¯
a
cc @prha
i am guessing you will get stuck with repeat failures if you try that
what would be good would be to try to capture some of the information like above when the exception happens
open file descriptors and threads since even though the local compute log manager shouldn’t be on - I also see no reason for us to be exhausting these resources unless we are leaking something
s
kk, i'll add the
lsof
output to the exceptions logged to sentry
i can side step all of this if i need to by running in ephemeral mode (not passing a dagster instance) right?
a
we were able to repro the NoOpComputeLog manager bug and I think you would still hit that in “ephemeral” mode since thats the one it uses
on trajectory to have a fix out today in
0.7.16
s
sweeeet
a
still want to know why were exhausting resources… BUT should get you back in a good state
s
yeah, i added lsof subprocess on those two exceptions
a
awesome send over any info if you hit it
s
will do
a
the bug only affects loading from config path so you could do an ephemeral instance if that is helpful for the next handful of hours
s
this is all i got from lsof
p
@Sam Rausser Just released
0.7.16
which should resolve the issue of
NoOpComputeLogManager
behaving like the default compute log manager…. The config is slightly changed, as you no longer need to specify a
base_dir
in the config.
s
awesome, i'll give it a spin
what did i do wrong?