Thomas Zhang
06/04/2021, 3:03 PMschrockn
06/04/2021, 3:05 PMJean-Pierre M
06/04/2021, 3:13 PMEduardo Santizo
06/04/2021, 6:07 PMNoel Gomez
06/05/2021, 3:52 AMassaf
06/06/2021, 7:34 AMarchiving_inventory_parquet_files = databricks_process_inventory()
for pq in archiving_inventory_parquet_files:
run_archiving_pipeline_dagster(pq)
To better track progress of the actual archiving, I wanted to use partition sets, where each partition represents a single parquet file. What I did was output the list of parquet files as a CSV, save that as a file next to my pipeline code, and generate a PartitionSetDefinition
from that. However, I need to add such a CSV to my code whenever I rerun the Databricks code and generate a new partition set. Ideally, I would like to wrap that Databricks job in a Dagster pipeline, and have it add a partition set to the other pipeline when it's done.
My question is: is there a way to dynamically update the partition sets (namely, add a new one) for pipeline B, using output from pipeline A? Maintaining that state in Dagster is not a trivial ask, I understand, but has anybody else come across a similar challenge?Chris Le Sueur
06/07/2021, 11:03 AMdagster_1 | File "/usr/local/lib/python3.8/site-packages/dagster/core/instance/ref.py", line 235, in run_storage
dagster_1 | return self.run_storage_data.rehydrate()
dagster_1 | File "/usr/local/lib/python3.8/site-packages/dagster/serdes/config_class.py", line 85, in rehydrate
dagster_1 | return klass.from_config_value(self, result.value)
dagster_1 | File "/usr/local/lib/python3.8/site-packages/dagster_postgres/run_storage/run_storage.py", line 88, in from_config_value
dagster_1 | return PostgresRunStorage(
dagster_1 | File "/usr/local/lib/python3.8/site-packages/dagster_postgres/run_storage/run_storage.py", line 62, in __init__
dagster_1 | stamp_alembic_rev(pg_alembic_config(__file__), conn)
dagster_1 | File "/usr/local/lib/python3.8/site-packages/dagster/core/storage/sql.py", line 46, in stamp_alembic_rev
dagster_1 | stamp(alembic_config, rev)
...
dagster_1 | cursor.execute(statement, parameters)
dagster_1 | sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "alembic_version_pkc"
dagster_1 | DETAIL: Key (version_num)=(7cba9eeaaf1d) already exists.
So I said intermittent, and it seems like this is some kind of race condition because in other cases this initialisation proceeds and the tests all pass. When this happens though, postgres still logs uniqueness violations; they're just wrapped with some retry logic in dagster
(line 59 of run_storage.py
, just above 62 mentioned in the traceback). I don't understand how the retry actually helps though: if you retry something which results in a uniqueness violation, it will cause the same issue; the wrapped function is just a sqlalchemy metadata's create_all
method, which as far as I know doesn't check state before issuing the CREATE TABLE
statements. This suggests there's something going on I don't understand. On a "successful" run, the postgres log has 6 error messages from trying to create tables that already exist, and the dagster logs contain several instances of "Retrying failed database creation":
dagster_1 | WARNING:root:Retrying failed database creation
dagster_1 | WARNING:root:Retrying failed database creation
dagster_1 | WARNING:root:Retrying failed database creation
dagster_1 | WARNING:root:Retrying failed database creation
dagster_1 | WARNI [root] Retrying failed database creation
dagster_1 | WARNI [root] Retrying failed database creation
I am fairly sure the numeric correspondence is a coincidence, since the runs schema alone contains several of the 6 tables being duplicated. I don't understand the significance of the different logging
formats; I could only work out where one of these was coming from in the dagster codebase.
I should mention that as you'd expect, we sometimes see slightly different errors - this may happen while trying to initialise the event_log
for example.
I am unfamiliar with driving alembic programmatically so I don't really understand the significance of stamping the revision here in the dagster code as opposed to allowing alembic to do it.
If anyone has any idea as to why this might be happening we'd be grateful. As an aside I was wondering why there is an attempt to mitigate race conditions with retry logic here, instead of using SQL transaction logic to serialise the initialisation.Simon Späti
06/07/2021, 11:08 AMstdout
and stderr
implementation with 0.11.11. We use these quite heavily for debugging. I found that the UI has changed, and these are better integrated, although I can’t change now to any other step except the first one (e. g. in my case intialize
. Or do we need to change some config for the logger with that latest change? 0.11.12
seems not to solve the error.Anton Friberg
06/07/2021, 12:08 PMVlad Dumitrascu
06/07/2021, 5:49 PMset DAGSTER_PROJ_DIR=%~dp0
ECHO OFF
ECHO Opening Port 3000 in Firewall of local machine, to allow workgroup access by link.
set PORT=3000
set RULE_NAME="Dagster Open Port %PORT%"
netsh advfirewall firewall show rule name=%RULE_NAME% >nul
if not ERRORLEVEL 1 (
rem Rule %RULE_NAME% already exists.
echo A rule already exists with name: %RULE_NAME%
) else (
echo Rule %RULE_NAME% does not exist. Creating...
netsh advfirewall firewall add rule name=%RULE_NAME% dir=in action=allow protocol=TCP localport=%PORT%
)
...but I still have the issue.
I tried searching on this channel and found a brief reply saying to "use 0.0.0.0" . How do I implement this with Dagit?Emily
06/07/2021, 6:25 PMget_run_status
part. this is the code:
from dagster_graphql import DagsterGraphQLClient
from dagster_graphql import DagsterGraphQLClientError
from dagster.core.storage.pipeline_run import PipelineRunStatus
client = DagsterGraphQLClient(hostname='<our_host>')
RUN_ID = '33f2ed15-b2fe-4d37-bac9-adec6b8ebe3b'
try:
status: PipelineRunStatus = client.get_run_status(RUN_ID)
if status == PipelineRunStatus.SUCCESS:
print('yay')
else:
print('ugh')
except DagsterGraphQLClientError as exc:
print('why')
raise exc
Hebo Yang
06/07/2021, 7:49 PMpaul.q
06/08/2021, 7:52 AMdagster instance migrate
and it reported no errors. However, the dagster-daemon startup reports:
sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedTable) relation "bulk_actions" does not exist ...
Looking at our postgres DB I can see that there is no 'bulk_actions' table created.
How can we get this table created properly?
Our dagster.yaml looks like so:
run_storage:
module: dagster_postgres.run_storage
class: PostgresRunStorage
config:
postgres_db:
<usual postgres config>
event_log_storage:
module: dagster_postgres.event_log
class: PostgresEventLogStorage
config:
postgres_db:
<usual postgres config>
scheduler:
module: dagster.core.scheduler
class: DagsterDaemonScheduler
schedule_storage:
module: dagster_postgres.schedule_storage
class: PostgresScheduleStorage
config:
postgres_db:
<usual postgres config>
Yan
06/08/2021, 10:33 AMEduardo Santizo
06/08/2021, 3:56 PMpachidermus
06/08/2021, 5:06 PMMakoto
06/08/2021, 5:25 PMDan Corbiani
06/08/2021, 5:47 PMKirk Stennett
06/08/2021, 5:59 PMJosh Lloyd
06/08/2021, 7:36 PMEduardo Santizo
06/08/2021, 7:37 PMOliver
06/08/2021, 9:17 PMQueuedRunCoordinator
in k8s and using an ELK stack to collect logs. The PIPELINE_ENQUEUED
events are not making it to Elasticsearch. What service would be logging these events and how do I get it to send them to stdout/stderr?Dylan Bienstock
06/08/2021, 10:01 PMMessage: Event logs invalid for run id 8837d735-d0e7-4bdd-9456-cac93541ef3a
Path:
Locations:
Stack Trace:
File "/usr/local/lib/python3.7/site-packages/rx/core/observablebase.py", line 67, in set_disposable
subscriber = self._subscribe_core(auto_detach_observer)
File "/usr/local/lib/python3.7/site-packages/rx/core/anonymousobservable.py", line 20, in _subscribe_core
return self._subscribe(observer)
File "/usr/local/lib/python3.7/site-packages/dagster_graphql/implementation/pipeline_run_storage.py", line 11, in __call__
events = self.instance.logs_after(self.run_id, self.after_cursor)
File "/usr/local/lib/python3.7/site-packages/dagster/core/instance/__init__.py", line 997, in logs_after
return self._event_storage.get_logs_for_run(run_id, cursor=cursor, of_type=of_type)
File "/usr/local/lib/python3.7/site-packages/dagster/core/storage/event_log/sql_event_log.py", line 199, in get_logs_for_run
events_by_id = self.get_logs_for_run_by_log_id(run_id, cursor, of_type)
File "/usr/local/lib/python3.7/site-packages/dagster/core/storage/event_log/sql_event_log.py", line 178, in get_logs_for_run_by_log_id
raise DagsterEventLogInvalidForRun(run_id=run_id) from err
The above exception was the direct cause of the following exception:
Message: Invariant failed. Description: Attempted to deserialize class "ComputeLogsCaptureData" which is not in the whitelist.
Stack Trace:
File "/usr/local/lib/python3.7/site-packages/dagster/core/storage/event_log/sql_event_log.py", line 175, in get_logs_for_run_by_log_id
deserialize_json_to_dagster_namedtuple(json_str), "event", EventRecord
File "/usr/local/lib/python3.7/site-packages/dagster/serdes/serdes.py", line 242, in deserialize_json_to_dagster_namedtuple
check.str_param(json_str, "json_str"), whitelist_map=_WHITELIST_MAP
File "/usr/local/lib/python3.7/site-packages/dagster/serdes/serdes.py", line 252, in _deserialize_json_to_dagster_namedtuple
return _unpack_value(seven.json.loads(json_str), whitelist_map=whitelist_map)
File "/usr/local/lib/python3.7/site-packages/dagster/serdes/serdes.py", line 287, in _unpack_value
unpacked_val = {key: _unpack_value(value, whitelist_map) for key, value in val.items()}
File "/usr/local/lib/python3.7/site-packages/dagster/serdes/serdes.py", line 287, in <dictcomp>
unpacked_val = {key: _unpack_value(value, whitelist_map) for key, value in val.items()}
File "/usr/local/lib/python3.7/site-packages/dagster/serdes/serdes.py", line 287, in _unpack_value
unpacked_val = {key: _unpack_value(value, whitelist_map) for key, value in val.items()}
File "/usr/local/lib/python3.7/site-packages/dagster/serdes/serdes.py", line 287, in <dictcomp>
unpacked_val = {key: _unpack_value(value, whitelist_map) for key, value in val.items()}
File "/usr/local/lib/python3.7/site-packages/dagster/serdes/serdes.py", line 278, in _unpack_value
f'Attempted to deserialize class "{klass_name}" which is not in the whitelist.',
File "/usr/local/lib/python3.7/site-packages/dagster/check/__init__.py", line 167, in invariant
raise CheckError(f"Invariant failed. Description: {desc}")
Abednego Santoso
06/09/2021, 4:42 AMdagster.core.errors.DagsterExecutionStepExecutionError: Error occurred while executing solid "run_dbt_project":
File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/plan/execute_plan.py", line 193, in _dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/plan/execute_step.py", line 306, in core_dagster_event_sequence_for_step
_step_output_error_checked_user_event_sequence(step_context, user_event_sequence)
File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/plan/execute_step.py", line 64, in _step_output_error_checked_user_event_sequence
for user_event in user_event_sequence:
File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/plan/execute_step.py", line 608, in _user_event_sequence_for_step_compute_fn
gen,
File "/usr/local/lib/python3.7/site-packages/dagster/utils/__init__.py", line 384, in iterate_with_context
return
File "/usr/local/lib/python3.7/contextlib.py", line 130, in __exit__
self.gen.throw(type, value, traceback)
File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/plan/utils.py", line 67, in solid_execution_error_boundary
) from e
The above exception was caused by the following exception:
FileNotFoundError: [Errno 2] No such file or directory: 'dbt': 'dbt'
File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/plan/utils.py", line 42, in solid_execution_error_boundary
yield
File "/usr/local/lib/python3.7/site-packages/dagster/utils/__init__.py", line 382, in iterate_with_context
next_output = next(iterator)
File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/plan/compute.py", line 126, in execute_core_compute
for step_output in _yield_compute_results(step_context, inputs, compute_fn):
File "/usr/local/lib/python3.7/site-packages/dagster/core/execution/plan/compute.py", line 109, in _yield_compute_results
for event in user_event_generator:
File "/usr/local/lib/python3.7/site-packages/dagster_dbt/cli/solids.py", line 201, in dbt_cli_run
ignore_handled_error=context.solid_config["ignore_handled_error"],
File "/usr/local/lib/python3.7/site-packages/dagster_dbt/cli/utils.py", line 61, in execute_cli
process = subprocess.Popen(command_list, stdout=subprocess.PIPE)
File "/usr/local/lib/python3.7/subprocess.py", line 800, in __init__
restore_signals, start_new_session)
File "/usr/local/lib/python3.7/subprocess.py", line 1551, in _execute_child
raise child_exception_type(errno_num, err_msg, err_filename)
Any advice?David
06/09/2021, 6:30 AMdjango-revproxy
and the page mostly loads, but we are unable to fully view any pipelines. On the page we see JSON Parse error: Unrecognized token '<'
, and on the Django webserver side Connection aborted: BadStatusLine('\x88\x00')
on requests to /dagit/graphql
I’ve seen previous posts about improvements in behavior when websockets are unavailable, and it’s certainly better than before in earlier versions (where the page wouldn’t load at all). But not being able to view pipelines is a blocker.
Is there any configuration we can do to make it better?Josh Lloyd
06/09/2021, 3:34 PM@solid(
input_defs=[
InputDefinition("start_after", Nothing),
InputDefinition("shell_command", str),
InputDefinition("env_dict", dict),
],
output_defs=[OutputDefinition(str, "result")],
)
def run_shell_command(context, shell_command, env_dict):
...
I’m invoking the solid as follows:
run_shell_command([store_secret_locally()], solid_1(), solid_2())
but I get an error:
dagster.core.errors.DagsterInvalidDefinitionError: Invalid dependencies: for solid "store_secret_locally" input "shell_command", the DagsterType "String" does not support fanning in (MultiDependencyDefinition). Use the List type, since fanning in will result in a list.
Dylan Bienstock
06/09/2021, 6:01 PMHei Yiu
06/09/2021, 10:14 PMGabriel Chu
06/10/2021, 5:36 AMtyping
pkg? Was asking because I see examples of Dagster using its won Dagster types. Apologise if this is the wrong channel for the question.Tiri Georgiou
06/10/2021, 12:09 PM@solid(required_resource_keys={
"s3"},
config_schema={
'file_key': Field(str, is_required=True, description="Path from bucket to file i.e. data/raw/smmt_raw.csv"),
'bucket': Field(str, is_required=True, description="Bucket name i.e. data-team-staging")
}
)
def read_from_s3(context) -> pd.DataFrame:
"""read_from_s3 will load csv from s3.
Configs:
file_key (str): Path from bucket name to csv.
bucket (str): Bucket name located in s3.
Returns:
(DataFrame): loaded csv as a pandas DataFrame.
"""
# Get response
resp = context.resources.s3.get_object(
Bucket=context.solid_config["bucket"], Key=context.solid_config["file_key"])
# As dataframe
df = pd.read_csv(resp['Body'])
<http://context.log.info|context.log.info>(f"Columns of dataframe: {df.columns}")
return df
It just simply reads in the csv saved from a bucket defined by its bucket_name and key. It works locally fine. Obviously locally its reading from my .aws/credentials
but I suppose the container wouldn't need any of these credentials because its already got an IAM role? Or is there another config I need to set?