Pablo Beltran
10/07/2022, 1:08 AMpeay
10/07/2022, 10:26 AMcontext.op_config
is None
when I try and materialize an asset without providing any configuration override. However, context.op_config
is correctly set if I do override at least one field from the materialize launchpad. Is this a known bug?
Running Dagster 1.0.6Archie Kennedy
10/07/2022, 10:30 AMPermissive
dict with 1 required field (op config_schema).
Example:
@op(
config_schema={
"payload": Field(
Permissive(
{
"URI": Field(str, is_required=True),
"Webhook": Field(str, is_required=False),
},
)
)
}
)
Celio de Assis Picanco
10/07/2022, 12:07 PM@asset
def raw_users():
string_data = io.String()
user_data = pd.read_csv(FILE_PATH)
return user_data
(I haven’t really started playing with the configuration yet, so FILE_PATH is just a hard coded variable).
When I run it in dagit, dagster creates a folder called storage in the directory I’m running the pipeline from as well as a file called raw users in the rencetly created storage directory.
The problem is: I can’t open the asset I’ve just created. I see that dagster generated a file, but it seems to be a bytes file of a pandas DataFrame that I can’t manage to open.
Which brings me to my three questions: how can I customise the storage destination and how can I open it from a job, so that I can access it from an op?
Thanks in advance for any help.Lucas Gabriel
10/07/2022, 12:40 PMpython .\repo.py
Is there a better away ?Tom Reilly
10/07/2022, 3:16 PMacquisition_url_table
that logs files we need to download and a downloading service will periodically check the table and download new files. Numerous assets across different jobs can write to this table, and it's also possible to write to the table at different steps within the same job. Ideally, we'd like the acquisition_url_table
asset to write records whenever any of its inputs are received and not have to wait for all of its inputs. How would one go about this? Should the assets which create the records also write them to the table (i.e some_unique_identifier_acquisition_url_records
, another_unique_identifier_acquisition_url_records
)? The latter seems easy to implement but less desirable since useful information regarding the underlying table would be scattered across many assets, whereas the former would allow a single asset to hold all metadata/materialization info in a single asset.Zach P
10/07/2022, 3:58 PMZachary Bluhm
10/07/2022, 4:28 PMSteven Tran
10/07/2022, 7:13 PMMeghan Heintz
10/07/2022, 7:29 PMERROR: Command errored out with exit status 1: /Users/meghanheintz/miniconda3/bin/python -u -c 'import sys, setuptools, tokenize; sys.argv[0] = '"'"'/private/var/folders/nw/xzhc52090l114y_l297x2fr80000gn/T/pip-install-gqx_efxj/grpcio_4d5a9bdddbb34530a68d96f204966b96/setup.py'"'"'; __file__='"'"'/private/var/folders/nw/xzhc52090l114y_l297x2fr80000gn/T/pip-install-gqx_efxj/grpcio_4d5a9bdddbb34530a68d96f204966b96/setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' install --record /private/var/folders/nw/xzhc52090l114y_l297x2fr80000gn/T/pip-record-uof6h8de/install-record.txt --single-version-externally-managed --compile --install-headers /Users/meghanheintz/miniconda3/include/python3.8/grpcio Check the logs for full command output.
Sean Lindo
10/07/2022, 7:46 PMStephen Bailey
10/07/2022, 7:53 PMGiovanni Paolo
10/07/2022, 9:25 PMwith_resources
but for jobs?Apoorv Yadav
10/07/2022, 9:45 PMsarah
10/08/2022, 12:00 AMgrpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.DEADLINE_EXCEEDED
details = "Deadline Exceeded"
debug_error_string = "{"created":"@1665128988.955836327","description":"Deadline Exceeded","file":"src/core/ext/filters/deadline/deadline_filter.cc","file_line":81,"grpc_status":4}"
I believe this sensor (@run_failure_sensor) handles the cursor? We do have a high volume of runs at certain times, so maybe this is causing the timeout. Or maybe there is an issue with the cursor. I do get slack messages about the same job failure repeatedly (although the sensor times out, a slack message with the one failed job arrives every 5-20 minutes, — so it looks like the cursor doesn’t get updated, probably because the sensor fails). Any ideas on how to fix this? I was also unable to monitor the sensors (or turn them off) in Dagit while this was happening (I turned the sensor off after connecting with the pod running the daemon using dagster sensor stop …, after which I was able to view sensor information again in dagit).GTC
10/08/2022, 9:01 AMSpencer Guy
10/08/2022, 3:16 PMSpencer Guy
10/08/2022, 3:40 PMSean Lindo
10/08/2022, 7:59 PMSimon Vadée
10/10/2022, 9:01 AMdagster_dbt
integration with a (rather complex) dbt project with multiple dependencies. I use load_assets_from_dbt_project
to generate a graph for all of my models (for by base dbt project and all of its dependencies). When I try to run a materialisation for a single node of the graph, I get
dagster._core.errors.DagsterSubprocessError: During multiprocess execution errors occurred in child processes:
In process 79: dagster._core.errors.DagsterExecutionStepNotFoundError: Can not build subset plan from unknown step: run_dbt_<package>
where <package>
is sometimes wrong (ie: unrelated to my node). I went digging into the code and I think the issue comes from dgster_dbt/asset_defs.py#_dbt_nodes_to_assets
in which the package name is computed from the graph of dependencies but happens to be undeterministic, hence producing the error I printed above.
My dbt project runs fine when using only dbt without dagster. I also tried removing all dependencies but one and the issue does not appear anymore.
Can someone help plz 🤓 ?
dagster-dbt==0.16.12
dbt-core==1.2.2
Archie Kennedy
10/10/2022, 9:18 AMdagster_aws.s3.s3_pickle_io_manager
for multiple jobs, using the same bucket. Do I need to use an s3_prefix for each job?
I don't want collisions 🙂Archie Kennedy
10/10/2022, 1:20 PMEugenio Contreras
10/10/2022, 1:47 PMGiovanni Paolo
10/10/2022, 2:14 PMGeoffrey Greenleaf
10/10/2022, 2:26 PMpy4j.protocol.Py4JError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext
File "/home/leaf/.cache/pypoetry/virtualenvs/astra-zs_wAcX2-py3.8/lib/python3.8/site-packages/dagster/_core/errors.py", line 184, in user_code_error_boundary
yield
File "/home/leaf/.cache/pypoetry/virtualenvs/astra-zs_wAcX2-py3.8/lib/python3.8/site-packages/dagster/_core/execution/resources_init.py", line 325, in single_resource_event_generator
resource_def.resource_fn(context)
File "/home/leaf/.cache/pypoetry/virtualenvs/astra-zs_wAcX2-py3.8/lib/python3.8/site-packages/dagster_pyspark/resources.py", line 54, in pyspark_resource
return PySparkResource(init_context.resource_config["spark_conf"])
File "/home/leaf/.cache/pypoetry/virtualenvs/astra-zs_wAcX2-py3.8/lib/python3.8/site-packages/dagster_pyspark/resources.py", line 21, in __init__
self._spark_session = spark_session_from_config(spark_conf)
File "/home/leaf/.cache/pypoetry/virtualenvs/astra-zs_wAcX2-py3.8/lib/python3.8/site-packages/dagster_pyspark/resources.py", line 16, in spark_session_from_config
return builder.getOrCreate()
File "/home/leaf/.cache/pypoetry/virtualenvs/astra-zs_wAcX2-py3.8/lib/python3.8/site-packages/pyspark/sql/session.py", line 269, in getOrCreate
sc = SparkContext.getOrCreate(sparkConf)
File "/home/leaf/.cache/pypoetry/virtualenvs/astra-zs_wAcX2-py3.8/lib/python3.8/site-packages/pyspark/context.py", line 483, in getOrCreate
SparkContext(conf=conf or SparkConf())
File "/home/leaf/.cache/pypoetry/virtualenvs/astra-zs_wAcX2-py3.8/lib/python3.8/site-packages/pyspark/context.py", line 197, in __init__
self._do_init(
File "/home/leaf/.cache/pypoetry/virtualenvs/astra-zs_wAcX2-py3.8/lib/python3.8/site-packages/pyspark/context.py", line 282, in _do_init
self._jsc = jsc or self._initialize_context(self._conf._jconf)
File "/home/leaf/.cache/pypoetry/virtualenvs/astra-zs_wAcX2-py3.8/lib/python3.8/site-packages/pyspark/context.py", line 402, in _initialize_context
return self._jvm.JavaSparkContext(jconf)
File "/home/leaf/.cache/pypoetry/virtualenvs/astra-zs_wAcX2-py3.8/lib/python3.8/site-packages/py4j/java_gateway.py", line 1585, in __call__
return_value = get_return_value(
File "/home/leaf/.cache/pypoetry/virtualenvs/astra-zs_wAcX2-py3.8/lib/python3.8/site-packages/py4j/protocol.py", line 334, in get_return_value
raise Py4JError(
James Hale
10/10/2022, 2:34 PMins
resolves the issue:
@asset(
name="cdrs",
key_prefix=PREFIX,
required_resource_keys={"atscall"},
io_manager_key="atscall_snowflake_io_manager",
ins={"cdrs_search_start_time": AssetIn(key=AssetKey(PREFIX + ["cdrs_search_start_time"]))}
)
def get_recent_cdrs(context, cdrs_search_start_time):
# ...
Issac Loo
10/10/2022, 2:38 PMDavi
10/10/2022, 2:55 PMZach
10/10/2022, 5:42 PMAbhijeet Singh
10/10/2022, 9:46 PMAbhijeet Singh
10/10/2022, 9:46 PMdagster-daemon run
with an empty dagster yaml file and memory footprint seemed to increase by 100 MB. The increase was an extra 200 MB when I ran dagit. Note that this without any load or job running.
I guess the memory needed by storage (postgres etc) needed by daemon will increase this number.
The above numbers seem to be a lot for my use-case. Is there a way to run jobs with dagster in a more lightweight manner?
Is dagit required in every deployment?daniel
10/17/2022, 10:14 PMAbhijeet Singh
10/19/2022, 9:32 PM