Sam Rausser
06/04/2020, 2:39 PMcompute_logs:
module: dagster.core.storage.local_compute_log_manager
class: NoOpComputeLogManager
config:
base_dir: /tmp/datafarm
alex
06/04/2020, 2:46 PMdatafarm/utils/pipeline_partition_runner.py
and what is the lifecylce of the process calling it?Sam Rausser
06/04/2020, 2:47 PMinstance = DagsterInstance.get()
Sam Rausser
06/04/2020, 2:48 PMDAGSTER_HOME
setalex
06/04/2020, 2:49 PMmessages are pickled to disk and the pipeline is calledhow is the pipeline called?
alex
06/04/2020, 2:49 PMSam Rausser
06/04/2020, 2:50 PMalex
06/04/2020, 2:52 PMalex
06/04/2020, 2:52 PMSam Rausser
06/04/2020, 2:56 PMingestion_pipeline = ExecutionTargetHandle.for_pipeline_module(
'datafarm.pipelines',
pipeline_config.name,
).build_pipeline_definition()
environment_dict = pipeline_config.pipeline_partition.environment_dict_for_partition(partition)
tags = pipeline_config.pipeline_partition.tags_for_partition(partition)
instance = DagsterInstance.get()
<http://logger.info|logger.info>(f'{pipeline_config.short_name} Started')
execute_pipeline(
ingestion_pipeline,
environment_dict=environment_dict,
tags=tags,
mode=EP_ENV,
instance=instance,
)
alex
06/04/2020, 2:57 PMalex
06/04/2020, 2:58 PMthread
objects that we were failing to delete and since we held that reference all of the files in the thread were held open indefinitelySam Rausser
06/04/2020, 2:58 PMwhile self._keep_running:
with self._statsd.timer('extract_kafka_messages'):
start = time.time()
kafka_messages = list()
while (
len(kafka_messages) <= max_messages_per_run
and
time.time() - start < consumer_poll_duration_ms / 1000
and
self._keep_running
):
message_set = consumer.poll(
timeout_ms=poll_timeout_ms,
)
for _, messages in message_set.items():
if not self._keep_running:
# dont get more messages if
# shutdown signal received
break
for message in messages:
kafka_messages.append(message)
if not self._keep_running:
# dont run pipeline if shutdown signal received
break
if kafka_messages:
self.run_pipeline(kafka_messages)
with self._statsd.timer('commit_kafka_messages'):
consumer.commit()
def run_pipeline(self, kafka_messages):
with open(self._save_path, 'wb') as f:
pickle.dump(kafka_messages, f)
PipelinePartitionRunner.run_pipeline(
partition=self._partition,
pipeline_config=self._pipeline_config,
logger=self._logger,
sentry=self._sentry,
statsd=self._statsd,
)
I dont use any threadsSam Rausser
06/04/2020, 2:59 PMmax
06/04/2020, 2:59 PMPipelinePartitionRunner.run_pipeline
?Sam Rausser
06/04/2020, 3:00 PMSam Rausser
06/04/2020, 3:00 PMalex
06/04/2020, 3:07 PMalex
06/04/2020, 3:08 PMthe process is invoked via pythonwhat is the deploy set up? do you have ssh access to the machine?
Sam Rausser
06/04/2020, 3:09 PMSam Rausser
06/04/2020, 3:09 PMalex
06/04/2020, 3:11 PMalex
06/04/2020, 3:11 PMls /proc/$pid/fd
Sam Rausser
06/04/2020, 3:13 PMls /proc/112763/fd
0 104 110 117 123 13 136 142 149 155 161 168 174 180 187 193 2 205 211 218 224 24 30 37 43 5 56 62 69 75 81 88 94
1 105 111 118 124 130 137 143 15 156 162 169 175 181 188 194 20 206 212 219 225 25 31 38 44 50 57 63 7 76 82 89 95
10 106 112 119 125 131 138 144 150 157 163 17 176 182 189 195 200 207 213 22 226 26 32 39 45 51 58 64 70 77 83 9 96
100 107 113 12 126 132 139 145 151 158 164 170 177 183 19 196 201 208 214 220 227 27 33 4 46 52 59 65 71 78 84 90 97
101 108 114 120 127 133 14 146 152 159 165 171 178 184 190 197 202 209 215 221 228 28 34 40 47 53 6 66 72 79 85 91 98
102 109 115 121 128 134 140 147 153 16 166 172 179 185 191 198 203 21 216 222 229 29 35 41 48 54 60 67 73 8 86 92 99
103 11 116 122 129 135 141 148 154 160 167 173 18 186 192 199 204 210 217 223 23 3 36 42 49 55 61 68 74 80 87 93
alex
06/04/2020, 3:14 PMlsof -p 112763
Sam Rausser
06/04/2020, 3:16 PMSam Rausser
06/04/2020, 3:16 PMalex
06/04/2020, 3:17 PMalex
06/04/2020, 3:22 PM<http://batch22sj.prod.easypo.net:50112->klogs1sj.prod.easypo.net:40172|batch22sj.prod.easypo.net:50112->klogs1sj.prod.easypo.net:40172>
Sam Rausser
06/04/2020, 3:22 PMBlockingIO
and the can't start new thread error
now i'm only getting the BlockingIO
erroralex
06/04/2020, 3:22 PMSam Rausser
06/04/2020, 3:23 PMcan't start new thread error
alex
06/04/2020, 3:25 PMalex
06/04/2020, 3:25 PMalex
06/04/2020, 3:27 PMalex
06/04/2020, 3:28 PM$DAGSTER_HOME
alex
06/04/2020, 3:28 PMstorage
and history
directories in there full of stuff if its working as expectedSam Rausser
06/04/2020, 3:29 PMecho $DAGSTER_HOME
/srv/datafarm
Sam Rausser
06/04/2020, 3:30 PMdagster.yaml
is in /srv/datafarm
Sam Rausser
06/04/2020, 3:31 PMlocal_artifact_storage
?alex
06/04/2020, 3:31 PMdagster.yaml
?Sam Rausser
06/04/2020, 3:31 PMalex
06/04/2020, 3:32 PM/tmp/datform
alex
06/04/2020, 3:32 PMSam Rausser
06/04/2020, 3:32 PM/tmp
Sam Rausser
06/04/2020, 3:33 PMalex
06/04/2020, 3:33 PMalex
06/04/2020, 3:33 PMalex
06/04/2020, 3:33 PMpstree
Sam Rausser
06/04/2020, 3:33 PMstorage
or history
in /tmp/datafarm
Sam Rausser
06/04/2020, 3:34 PMalex
06/04/2020, 3:36 PMpstree -p 112763
pstree 112763
Sam Rausser
06/04/2020, 3:38 PMpstree -p 92629
ingest(92629)─┬─stdin2epilog(92635)
├─stdin2epilog(92636)
└─{ingest}(92658)
alex
06/04/2020, 3:39 PMpstree -s python
Sam Rausser
06/04/2020, 3:40 PM-s
trying to do?Sam Rausser
06/04/2020, 3:40 PMpstree -s python
pstree: invalid option -- 's'
Usage: pstree [ -a ] [ -c ] [ -h | -H PID ] [ -l ] [ -n ] [ -p ] [ -u ]
[ -A | -G | -U ] [ PID | USER ]
pstree -V
Display a tree of processes.
-a show command line arguments
-A use ASCII line drawing characters
-c don't compact identical subtrees
-h highlight current process and its ancestors
-H PID highlight this process and its ancestors
-G use VT100 line drawing characters
-l don't truncate long lines
-n sort output by PID
-p show PIDs; implies -c
-u show uid transitions
-U use UTF-8 (Unicode) line drawing characters
-V display version information
-Z show SELinux security contexts
PID start at this PID; default is 1 (init)
USER show only trees rooted at processes of this user
alex
06/04/2020, 3:41 PMalex
06/04/2020, 3:41 PM/tmp
there could be other strict restrictions on number of threads per process or number of file descriptorsalex
06/04/2020, 3:42 PMSam Rausser
06/04/2020, 3:42 PMSam Rausser
06/04/2020, 3:44 PMNoOpComputeLogManager
trying to start new threads/subprocesses ?alex
06/04/2020, 3:45 PMalex
06/04/2020, 3:46 PMalex
06/04/2020, 3:46 PMSam Rausser
06/04/2020, 3:47 PMdagster.yaml
file is being picked up properly as it is writing the runs to /tmp/datafarmalex
06/04/2020, 3:48 PMSam Rausser
06/04/2020, 3:50 PMalex
06/04/2020, 3:50 PMalex
06/04/2020, 3:53 PMalex
06/04/2020, 3:53 PMalex
06/04/2020, 3:55 PMSam Rausser
06/04/2020, 3:58 PMlsof
output to the exceptions logged to sentrySam Rausser
06/04/2020, 5:03 PMalex
06/04/2020, 5:05 PMalex
06/04/2020, 5:05 PM0.7.16
Sam Rausser
06/04/2020, 5:05 PMalex
06/04/2020, 5:06 PMSam Rausser
06/04/2020, 5:07 PMalex
06/04/2020, 5:07 PMSam Rausser
06/04/2020, 5:12 PMalex
06/04/2020, 5:17 PMSam Rausser
06/04/2020, 5:17 PMSam Rausser
06/04/2020, 5:17 PMprha
06/04/2020, 11:42 PM0.7.16
which should resolve the issue of NoOpComputeLogManager
behaving like the default compute log manager…. The config is slightly changed, as you no longer need to specify a base_dir
in the config.Sam Rausser
06/04/2020, 11:44 PMSam Rausser
06/05/2020, 12:10 AMSam Rausser
06/05/2020, 12:10 AMSam Rausser
06/05/2020, 12:11 AM