Ben Torvaney
04/01/2021, 1:12 PMDeveshi
04/01/2021, 3:10 PMsashank
04/01/2021, 3:16 PMuser
04/01/2021, 10:12 PMdish
04/01/2021, 10:15 PMSasha Gorelikov
04/02/2021, 8:42 AM@solid
def s1(context):
pass
@solid
def s2(context):
pass
If _var==1:
_run_solid_by_string_name('s1')
else:
_run_solid_by_string_name('s2')
Thank youAndrew Herbst
04/02/2021, 5:13 PMcollect index unexpectedly set twice
. So, we’ve introduced an intermediate “fan-in” solid on both branches that collects the results and then hands those off to the downstream post-processing solid. My question: is this a known issue and is the notion of introducing the intermediate fan-in solid the correct approach here?Uttasarga Singh
04/03/2021, 1:45 AMJ
04/03/2021, 7:39 PMdeploy_docker
example that fails (see: https://github.com/jeremyadamsfisher/dagster-sensor-min-failing)
from dagster import pipeline, repository, schedule, solid, sensor, SkipReason
@solid
def hello(_):
return 1
@pipeline
def my_pipeline():
hello()
@schedule(cron_schedule="* * * * *", pipeline_name="my_pipeline", execution_timezone="US/Central")
def my_schedule(_context):
return {}
@sensor(pipeline_name="my_pipeline")
def always_skips(_context):
yield SkipReason("I always skip!")
@repository
def deploy_docker_repository():
return [my_pipeline, my_schedule, always_skips]
This is the traceback:
docker_example_daemon | 2021-04-03 19:36:56 - SensorDaemon - INFO - Checking for new runs for sensor: always_skips
docker_example_daemon | 2021-04-03 19:36:56 - SensorDaemon - ERROR - Error launching sensor run: json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
docker_example_daemon |
docker_example_daemon | Stack Trace:
docker_example_daemon | File "/usr/local/lib/python3.7/site-packages/dagster/daemon/sensor.py", line 230, in execute_sensor_iteration
docker_example_daemon | sensor_debug_crash_flags,
docker_example_daemon | File "/usr/local/lib/python3.7/site-packages/dagster/daemon/sensor.py", line 264, in _evaluate_sensor
docker_example_daemon | job_state.job_specific_data.last_run_key if job_state.job_specific_data else None,
docker_example_daemon | File "/usr/local/lib/python3.7/site-packages/dagster/core/host_representation/repository_location.py", line 448, in get_external_sensor_execution_data
docker_example_daemon | last_run_key,
docker_example_daemon | File "/usr/local/lib/python3.7/site-packages/dagster/api/snapshot_sensor.py", line 41, in sync_get_external_sensor_execution_data_grpc
docker_example_daemon | last_run_key=last_run_key,
docker_example_daemon | File "/usr/local/lib/python3.7/site-packages/dagster/grpc/client.py", line 291, in external_sensor_execution
docker_example_daemon | res.serialized_external_sensor_execution_data_or_external_sensor_execution_error
docker_example_daemon | File "/usr/local/lib/python3.7/site-packages/dagster/serdes/serdes.py", line 236, in deserialize_json_to_dagster_namedtuple
docker_example_daemon | check.str_param(json_str, "json_str"), whitelist_map=_WHITELIST_MAP
docker_example_daemon | File "/usr/local/lib/python3.7/site-packages/dagster/serdes/serdes.py", line 246, in _deserialize_json_to_dagster_namedtuple
docker_example_daemon | return _unpack_value(seven.json.loads(json_str), whitelist_map=whitelist_map)
docker_example_daemon | File "/usr/local/lib/python3.7/json/__init__.py", line 361, in loads
docker_example_daemon | return cls(**kw).decode(s)
docker_example_daemon | File "/usr/local/lib/python3.7/json/decoder.py", line 337, in decode
docker_example_daemon | obj, end = self.raw_decode(s, idx=_w(s, 0).end())
docker_example_daemon | File "/usr/local/lib/python3.7/json/decoder.py", line 355, in raw_decode
docker_example_daemon | raise JSONDecodeError("Expecting value", s, err.value) from None
Am I missing something or should I file a github issue?Agon Shabi
04/04/2021, 5:58 PMRonak Jain
04/05/2021, 4:42 PMEduardo Santizo
04/05/2021, 5:09 PMChet Lemon
04/05/2021, 7:06 PM{Solid,CompositeSolid,Pipeline}ExecutionResult
objects and solids are contained as attributes in these
simple example:
import pickle
import dagster
@dagster.solid
def test(context):
pass
with open('/home/chet/data/testpickle.pkl', 'w') as p:
pickle.dump(test, p)
---------------------------------------------------------------------------
PicklingError Traceback (most recent call last)
<ipython-input-45-e900432760d1> in <module>
13
14 with open('/home/chet/data/testpickle.pkl', 'w') as p:
---> 15 pickle.dump(test, p)
16
17
PicklingError: Can't pickle <function test at 0x7f9bf70ab9d8>: it's not the same object as __main__.test
Michael Lynton
04/05/2021, 7:15 PMcollect
and dynamic mapping feature. I wanted to share my use case and make sure it fits.
I have a script that passes in a list of IDs, and iterates over each ID, to hit an api and download a file for that ID. The list of IDs varies per run. I also use concurrent.futures
to use multiple threads while downloading.
From looking at the docs and using the test code, I really like how it fans out each interaction to see the dynamic nature of what work the solid is doing. Is collect/dynamic mapping meant to run in serial? I also saw a mention of “retries” in the 0.11.0 blog post - is that an automatic thing?
I’d love any feedback on my use case - whether y’all think it fits and/or any gotchas I might consider. Thx! (Please pardon any incorrect use of technical terminology 🙂)Peter B
04/06/2021, 12:06 AMPeter B
04/06/2021, 12:11 AMvictor
04/06/2021, 9:57 AMKaspars
04/06/2021, 1:15 PMNicolas Gaillard
04/06/2021, 3:23 PMDaniel Michaelis
04/06/2021, 5:02 PMdagster-pyspark = "==0.10.9"
and dagster-celery-k8s = "==0.10.9"
). However, when the dependencies are resolved, the 0.10.9 versions of the specified packages are installed but the newest versions of dependencies such as dagster-spark, dagster-k8s and dagster-celery (currently 0.11.3) which were not explicitely specified. As I understand all versions of dagster packages should always be identical. Of course I can add dagster-spark, dagster-k8s and dagster-celery to my requirements file (pyproject.toml in poetry) but I can't foresee future changes or additions in dependencies for dagster packages, so I'd have to do that every time I add a package or update a version. Is it possible that the dagster team makes sure that these mismatches don't occur by preventing dependency resolution if versions of two or more dagster packages are unidentical/incompatible, and by locking the versions of dependencies to the specified version of the corresponding dagster package (i.e. if I specify dagster-pyspark = "==0.10.9"
, dagster-spark = "==0.10.9"
is implied? Or am I missing something?Eduardo Santizo
04/06/2021, 10:37 PMPeter B
04/07/2021, 5:46 AMYan
04/07/2021, 10:35 AMJosh Taylor
04/07/2021, 1:30 PM@schedule(
pipeline_name=yaml_file["name"],
name=yaml_file["name"] + "_schedule",
cron_schedule="* * * * *",
mode="prod",
execution_timezone="UTC",
)
def _foobar_schedule(date):
name = eval(yaml_file["name"])
name.get_preset("prod").run_config
but I get an error that NameError: name 'mypipelinename' is not defined
Is it possible to call a dynamic pipeline to execute?Josh Taylor
04/07/2021, 1:33 PMdagster.execute_pipeline
?Daniel Kim
04/07/2021, 3:24 PMRubén Lopez Lozoya
04/07/2021, 3:48 PMRubén Lopez Lozoya
04/07/2021, 3:49 PMRubén Lopez Lozoya
04/07/2021, 6:07 PMOperation name: PartitionSetLoaderQuery
Message: Unknown fragment "PartitionGraphFragment".
Path:
Locations: [{"line":69,"column":6}]
I am able to run the backfill with no problems but my screen is full of GraphQL error notifications, anyone knows why does this happen?Peter B
04/07/2021, 8:58 PM