William
12/05/2022, 4:29 PMДаниил Конев
12/05/2022, 5:39 PMstorage:
postgres:
postgres_db:
db_name: db
hostname: localhost
params: {
options: "-csearch_path=snbx_dagster"
}
scheme: postgresql+psycopg2
username: postgres
password: ********
port: 5432
I manage to start the daemon, the initialization in the database takes place (I see it), but when I start dagit, the service does not start, because cannot bind to schema and throws an error:
dagster> dagit -p 3030
2022-12-05 20:31:01 +0300 - dagit - INFO - Serving dagit on <http://127.0.0.1:3030> in process 10768
Traceback (most recent call last):
File "C:\dagster\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1900, in _execute_context
self.dialect.do_execute(
File "C:\dagster\venv\lib\site-packages\sqlalchemy\engine\default.py", line 736, in do_execute
cursor.execute(statement, parameters)
psycopg2.errors.UndefinedTable: ╬╪╚┴╩└: юЄэю°хэшх "instance_info" эх ёє∙хёЄтєхЄ
LINE 2: FROM instance_info
^
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\Python310\lib\runpy.py", line 196, in _run_module_as_main
return _run_code(code, main_globals, None,
File "C:\Python310\lib\runpy.py", line 86, in _run_code
exec(code, run_globals)
File "C:\dagster\venv\Scripts\dagit.exe\__main__.py", line 7, in <module>
File "C:\dagster\venv\lib\site-packages\dagit\cli.py", line 181, in main
cli(auto_envvar_prefix="DAGIT") # pylint:disable=E1120
File "C:\dagster\venv\lib\site-packages\click\core.py", line 1130, in __call__
return self.main(*args, **kwargs)
File "C:\dagster\venv\lib\site-packages\click\core.py", line 1055, in main
rv = self.invoke(ctx)
File "C:\dagster\venv\lib\site-packages\click\core.py", line 1404, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "C:\dagster\venv\lib\site-packages\click\core.py", line 760, in invoke
return __callback(*args, **kwargs)
File "C:\dagster\venv\lib\site-packages\dagit\cli.py", line 133, in dagit
host_dagit_ui_with_workspace_process_context(
File "C:\dagster\venv\lib\site-packages\dagit\cli.py", line 166, in host_dagit_ui_with_workspace_process_context
log_action(workspace_process_context.instance, START_DAGIT_WEBSERVER)
File "C:\dagster\venv\lib\site-packages\dagster\_core\telemetry.py", line 494, in log_action
(dagster_telemetry_enabled, instance_id, run_storage_id) = _get_instance_telemetry_info(
File "C:\dagster\venv\lib\site-packages\dagster\_core\telemetry.py", line 333, in _get_instance_telemetry_info
run_storage_id = instance.run_storage.get_run_storage_id()
File "C:\dagster\venv\lib\site-packages\dagster\_core\storage\runs\sql_run_storage.py", line 778, in get_run_storage_id
row = self.fetchone(query)
File "C:\dagster\venv\lib\site-packages\dagster\_core\storage\runs\sql_run_storage.py", line 96, in fetchone
result_proxy = conn.execute(query)
File "C:\dagster\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1380, in execute
return meth(self, multiparams, params, _EMPTY_EXECUTION_OPTS)
File "C:\dagster\venv\lib\site-packages\sqlalchemy\sql\elements.py", line 333, in _execute_on_connection
return connection._execute_clauseelement(
File "C:\dagster\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1572, in _execute_clauseelement
ret = self._execute_context(
File "C:\dagster\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1943, in _execute_context
self._handle_dbapi_exception(
File "C:\dagster\venv\lib\site-packages\sqlalchemy\engine\base.py", line 2124, in _handle_dbapi_exception
util.raise_(
File "C:\dagster\venv\lib\site-packages\sqlalchemy\util\compat.py", line 208, in raise_
raise exception
File "C:\dagster\venv\lib\site-packages\sqlalchemy\engine\base.py", line 1900, in _execute_context
self.dialect.do_execute(
File "C:\dagster\venv\lib\site-packages\sqlalchemy\engine\default.py", line 736, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.ProgrammingError: (psycopg2.errors.UndefinedTable) ╬╪╚┴╩└: юЄэю°хэшх "instance_info" эх ёє∙хёЄтєхЄ
LINE 2: FROM instance_info
^
[SQL: SELECT instance_info.run_storage_id
FROM instance_info]
Has anyone already experienced this?Daniel Mosesson
12/05/2022, 6:22 PMAbhinav Ayalur
12/05/2022, 6:50 PMAbhinav Ayalur
12/05/2022, 7:18 PMHuy Dao
12/05/2022, 7:27 PM@run_status_sensor(
run_status=DagsterRunStatus.SUCCESS,
request_job=status_reporting_job,
)
def report_status_sensor(context):
# this condition prevents the sensor from triggering status_reporting_job again after it succeeds
if context.dagster_run.job_name != status_reporting_job.name:
run_config = {
"ops": {
"status_report": {"config": {"job_name": context.dagster_run.job_name}}
}
}
return RunRequest(run_key=None, run_config=run_config)
else:
return SkipReason("Don't report status of status_reporting_job")
is there anyway I can get the run config of the job from context.dagster_run? I want to check the previous run_config before I trigger another job. Many thanksByron Murillo
12/05/2022, 8:59 PMYang
12/05/2022, 9:22 PMcontext = build_op_context(
config={"exec_path": ""},
partition_key=MultiPartitionKey({"fiscal_year": "2021", "dataset": "idealratings"}))
Alex Prykhodko
12/06/2022, 12:21 AMs3_pickle_io_manager
. Works as expected when using fs_io_manager
(the input upstream argument is a dict with keys as partitions).
Code:
@asset(partitions_def=StaticPartitionsDefinition(get_partitions()))
def sa_metrics_normalized(context: OpExecutionContext, sa_metrics_raw):
...
@asset
def sa_metrics_data_frame(context: OpExecutionContext, sa_metrics_normalized):
...
Error:
dagster._check.CheckError: Failure condition: Tried to access partition key for input 'sa_metrics_normalized' of step 'sa_metrics_data_frame', but the step input has a partition range: '2014-01' to '2016-12'.
File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/plan/utils.py", line 52, in solid_execution_error_boundary
yield
File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/plan/inputs.py", line 856, in _load_input_with_input_manager
value = input_manager.load_input(context)
File "/usr/local/lib/python3.8/site-packages/dagster_aws/s3/io_manager.py", line 72, in load_input
key = self._get_path(context)
File "/usr/local/lib/python3.8/site-packages/dagster_aws/s3/io_manager.py", line 33, in _get_path
path = context.get_asset_identifier()
File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/context/input.py", line 409, in get_asset_identifier
return [*self.asset_key.path, self.asset_partition_key]
File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/context/input.py", line 324, in asset_partition_key
return self.step_context.asset_partition_key_for_input(self.name)
File "/usr/local/lib/python3.8/site-packages/dagster/_core/execution/context/system.py", line 915, in asset_partition_key_for_input
check.failed(
File "/usr/local/lib/python3.8/site-packages/dagster/_check/__init__.py", line 1642, in failed
raise CheckError(f"Failure condition: {desc}")
Abhinav Ayalur
12/06/2022, 12:56 AMMycchaka Kleinbort
12/06/2022, 12:33 PMmostActiveUserId
that run on a daily schedule (pulls the latest user activity data from snowflake, returns the userId of the most active user - call this ). Most days the most active user is the same as yesterday. In this specific scenario, I only want to mark the downstream tasks as stale if the mostActiveUserId
asset value has changed.Tamas Juhasz
12/06/2022, 1:05 PMExecution of run for "" failed. Execution was interrupted unexpectedly. No user initiated termination request was found, treating as failure.
I've no clue where does this termination comes from.Damian
12/06/2022, 1:08 PMCasper Weiss Bang
12/06/2022, 1:36 PMdocker.errors.NotFound: 404 Client Error for <http+docker://localhost/v1.41/containers/cb639ffa1ed57b0cd6581e81de37bb4395bfb4cb7f398d03b2b15a9eabbcea37/start>: Not Found ("network dagster_network not found")
I can see docker-compose gives it another name, username_dagster_network
- i.e the name of my current user and the network.. You might want to document that somewhere. or am i doing something wrong?Mehdi Hasanvandy
12/06/2022, 2:54 PMMehdi Hasanvandy
12/06/2022, 2:59 PMKirk Stennett
12/06/2022, 4:14 PMDerek Truong
12/06/2022, 5:33 PMZachary Bluhm
12/06/2022, 7:54 PMdagsterApiGrpcArgs
?nickvazz
12/06/2022, 9:00 PMassets
?
import glob
import os
from dagster import asset, DynamicPartitionsDefinition
def get_partitions(_):
return map(os.path.basename, glob.glob("/some/path/*"))
@asset(
group_name='test_group',
# partitions_def=DynamicPartitionsDefinition(get_partitions), # this line makes it fail
)
def partitioned_asset(context):
<http://context.log.info|context.log.info>()
Binh Pham
12/06/2022, 9:33 PMalias
config?
dbt model with the file name my_schema___my__table.sql
{{ config(alias="my_table") }}}
SELECT 1
the asset key will be my_schema / my_schema___my__table
downstream asset using dagster-snowflake io manager
@asset(
in={"my_schema__mytable": AssetIn(key_prefix=["my_schema"])
...
)
def my_asset(my_schema__my_table: DataFrame):
...
the io manager would do a select * from <http://my_schema.my|my_schema.my>__schema____my__table
, which doesnt existChris Anderson
12/07/2022, 12:16 AM.collect()
and a dynamic op that's been turned into an asset from within another asset that has it as a dependency? For example, I have
@op(out=DynamicOut())
def observations(context) -> DynamicOutput[gpd.GeoDataFrame]:
# yield observations in chunks
This op gets turned into an asset with its own group later with AssetsDefinition.from_op
that's used across many jobs. In some jobs this dynamic behavior is beneficial and sought after, but in others i'd like to collect all the information before proceeding, like below
@asset(
ins={'observations': AssetIn('observations')},
outs={'collective_analysis': AssetOut()}
)
def collective_analysis(context, observations):
collected_obs = observations.collect()
# do analysis with all observations collected into one dataframe
Doing this code right now throws the following error and tries to split this collective_analysis
asset materialization dynamically according to the the original dynamic op:
dagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "collective_analysis"::AttributeError: 'GeoDataFrame' object has no attribute 'collect'
Gatsby Lee
12/07/2022, 3:46 AMdagster._core.errors.DagsterInvalidDefinitionError: "log__<dagster._core.definitions.composition.InvokedSolidOutputHandle object at 0x1088f1f90>" is not a valid name in Dagster. Names must be in regex ^[A-Za-z0-9_]+$.
Here is the sample code.
@op(out={"step_name": Out()})
def op__get_config():
step_name = "hello-dagster"
return step_name
def generate_op(step_name: str):
@op(name=f"log__{step_name}")
def func():
print(f"hello-{step_name}")
return func
@job
def job() -> None:
step_name = op__get_config()
log_op = generate_op(step_name)
Gatsby Lee
12/07/2022, 3:47 AMGatsby Lee
12/07/2022, 4:45 AMWilliam
12/07/2022, 7:10 AMshift+click
on asset graph pageWilliam
12/07/2022, 7:25 AMget_partitions
method?Sơn Lê
12/07/2022, 7:54 AMdagit
, ImportError: cannot import name 'introspection_query' from 'graphql' (/workspace/.pyenv_mirror/user/current/lib/python3.8/site-packages/graphql/__init__.py).
However, according to tutorial, dagster should run properly. Is there any fix?William
12/07/2022, 8:05 AMVrushank Kenkre
12/07/2022, 10:53 AMTraceback (most recent call last):
File "/home/hadoop/.local/lib/python3.7/site-packages/dagster/_core/code_pointer.py", line 138, in load_python_module
return importlib.import_module(module_name)
File "/usr/lib64/python3.7/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1006, in _gcd_import
File "<frozen importlib._bootstrap>", line 983, in _find_and_load
File "<frozen importlib._bootstrap>", line 965, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'with_pyspark_emr'
`dagster._core.errors.DagsterImportError: Encountered ImportError: No module named 'with_pyspark_emr'
while importing module with_pyspark_emr. Local modules were resolved using the working directory /home/ec2-user/dagster/my-dagster-project
. If another working directory should be used, please explicitly specify the appropriate path using the -d
or --working-directory
for CLI based targets or the working_directory
configuration option for workspace targets.`
I have setup dagtser on a dev EC2 machine and trying to run the job on EMR. The module with_pyspark_emr in present in /home/ec2-user/dagster/my-dagster-project
, I am not able to figure out what the issue is. Can someone please help?Vrushank Kenkre
12/07/2022, 10:53 AMTraceback (most recent call last):
File "/home/hadoop/.local/lib/python3.7/site-packages/dagster/_core/code_pointer.py", line 138, in load_python_module
return importlib.import_module(module_name)
File "/usr/lib64/python3.7/importlib/__init__.py", line 127, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 1006, in _gcd_import
File "<frozen importlib._bootstrap>", line 983, in _find_and_load
File "<frozen importlib._bootstrap>", line 965, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'with_pyspark_emr'
`dagster._core.errors.DagsterImportError: Encountered ImportError: No module named 'with_pyspark_emr'
while importing module with_pyspark_emr. Local modules were resolved using the working directory /home/ec2-user/dagster/my-dagster-project
. If another working directory should be used, please explicitly specify the appropriate path using the -d
or --working-directory
for CLI based targets or the working_directory
configuration option for workspace targets.`
I have setup dagtser on a dev EC2 machine and trying to run the job on EMR. The module with_pyspark_emr in present in /home/ec2-user/dagster/my-dagster-project
, I am not able to figure out what the issue is. Can someone please help?