Eric Cheminot
01/19/2022, 2:29 PMMiles Barr
01/19/2022, 2:59 PMdagster.core.errors.DagsterExecutionHandleOutputError: Error occurred while handling output "result" of step "make_people":
File "/Users/miles/.local/share/virtualenvs/dagster-KC6aDfP0/lib/python3.9/site-packages/dagster/core/execution/plan/execute_plan.py", line 222, in _dagster_event_sequence_for_step
for step_event in check.generator(step_events):
File "/Users/miles/.local/share/virtualenvs/dagster-KC6aDfP0/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py", line 328, in core_dagster_event_sequence_for_step
for evt in _type_check_and_store_output(step_context, user_event, input_lineage):
File "/Users/miles/.local/share/virtualenvs/dagster-KC6aDfP0/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py", line 383, in _type_check_and_store_output
for evt in _store_output(step_context, step_output_handle, output, input_lineage):
File "/Users/miles/.local/share/virtualenvs/dagster-KC6aDfP0/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py", line 512, in _store_output
for elt in iterate_with_context(
File "/Users/miles/.local/share/virtualenvs/dagster-KC6aDfP0/lib/python3.9/site-packages/dagster/utils/__init__.py", line 389, in iterate_with_context
return
File "/opt/homebrew/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/contextlib.py", line 137, in __exit__
self.gen.throw(typ, value, traceback)
File "/Users/miles/.local/share/virtualenvs/dagster-KC6aDfP0/lib/python3.9/site-packages/dagster/core/execution/plan/utils.py", line 73, in solid_execution_error_boundary
raise error_cls(
The above exception was caused by the following exception:
TypeError: cannot pickle '_thread.RLock' object
File "/Users/miles/.local/share/virtualenvs/dagster-KC6aDfP0/lib/python3.9/site-packages/dagster/core/execution/plan/utils.py", line 47, in solid_execution_error_boundary
yield
File "/Users/miles/.local/share/virtualenvs/dagster-KC6aDfP0/lib/python3.9/site-packages/dagster/utils/__init__.py", line 387, in iterate_with_context
next_output = next(iterator)
File "/Users/miles/.local/share/virtualenvs/dagster-KC6aDfP0/lib/python3.9/site-packages/dagster/core/execution/plan/execute_step.py", line 504, in _gen_fn
gen_output = output_manager.handle_output(output_context, output.value)
File "/Users/miles/.local/share/virtualenvs/dagster-KC6aDfP0/lib/python3.9/site-packages/dagster/core/storage/fs_io_manager.py", line 122, in handle_output
pickle.dump(obj, write_obj, PICKLE_PROTOCOL)
Any ideas of what’s going wrong? The failing function is making a data frame from static data, so I’m struggling to see where the problem is.Hugo Pedroso de Lima
01/19/2022, 3:16 PMBryan Chavez
01/19/2022, 4:13 PMcursor_dict = json.loads(context.cursor) if context.cursor else {}
asset_key = context.dagster_run.run_id
asset_key_cursor = cursor_dict.get(asset_key)
cursor_events_map = dict()
asset_key_event_records = context.instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=AssetKey(asset_key),
after_cursor=asset_key_cursor,
),
)
cursor_events_map[asset_key] = asset_key_event_records
# makes sure there are events for all cursor keys
if not all([bool(v) for v in cursor_events_map.values()]):
return
cursor_update = dict()
for cursor_key, cursor_events in cursor_events_map.items():
cursor_update[cursor_key] = cursor_events[0].storage_id
context.update_cursor(json.dumps(cursor_update))
Miles Barr
01/19/2022, 4:59 PMRyan Riopelle
01/19/2022, 9:00 PMpaul.q
01/19/2022, 10:52 PMdaily_partitioned_config
and had to resort to using a dynamic_partitioned_config
. When the job is run via this it conveniently leaves a partition
tag against the run. But normally the job is launched via a schedule. Using the @schedule
decorator I can specify a cron expression that supports my weekday requirement. The schedule looks something like this, where daily_run_config
is a convenience function that provides the run_config I need:
@schedule(cron_schedule="0 3 * * 1-5", execution_timezone='Australia/Melbourne', job=Jobs.daily_positions_upload)
def upload_schedule(context):
effective_date = (context.scheduled_execution_time - BDay(2)).strftime("%Y-%m-%d")
config = daily_run_config(effective_date)
return config
However, I would prefer for the schedule to use the dynamic_partition_config
I've already got. This would leave the partition
tag against the run, whereas the current scheduled run doesn't. But I can't return a dynamic_partitioned_config
from a schedule decorated function - I get this error:
dagster.check.CheckError: Object DynamicPartitionsDefinition(partition_fn=<function get_effective_dates1 at 0x000001F13297B798>) is not a TimeWindowPartitionsDefinition. Got DynamicPartitionsDefinition
Thanks
PaulMarcel M
01/20/2022, 11:53 AMThomas
01/20/2022, 12:49 PMGeorge Pearse
01/20/2022, 1:25 PMAlessandro Marrella
01/20/2022, 2:54 PMRoel Hogervorst
01/20/2022, 3:00 PMrequired_resource_keys is not one of ['frozenset' , 'set']. got ['myconnection'] which is type <class 'list'>
What I'd like to know is: What does this mean, how do I fix this, and what is best practice? (and I'd like to see an example in the docs?)Charles Leung
01/20/2022, 3:35 PMCharles Leung
01/20/2022, 3:47 PMRoel Hogervorst
01/20/2022, 4:59 PM-> pd.DataFrame:
But when I'm running a test I see the error Type check failed for op "nameofop" output "result" -expected type "DataFrame". Description Value of type<class function>. ... expected value to be of Python type pandas.core.frame.DataFrame
I thought it was when I return the dataframe?Tobias Macey
01/20/2022, 6:54 PMPresetDefinition
. The examples show how to handle that in the case that you have one being passed to preset_defs
in the @pipeline
decorator, but not a good example of handling a list of them. It seems that the answer is that I have to create multiple instances of the job to be able to pass it into the schedule with the appropriate config?Alex Service
01/20/2022, 10:43 PMMyResource.fetch(…)
), would it be reasonable if I reference the resource from within a threadpool, or would there be known issues with that kind of behavior?Bryan Chavez
01/20/2022, 10:57 PMAndy Chen
01/20/2022, 11:00 PMRESOURCE_KEY = "some_key"
context.resources[RESOURCE_KEY]
paul.q
01/20/2022, 11:09 PMGhanashyam Alakke
01/21/2022, 12:38 AMHardSchrockCafe
01/21/2022, 12:48 AM@op
def push_to_kafka(parsed_feed, topic):
producer = KafkaProducer(bootstrap_servers='XXXXXXX',
key_serializer=lambda x: x.encode('utf8'),
value_serializer=lambda x: json.dumps(x).encode('utf8'))
for entry in parsed_feed:
producer.send(topic,
key=entry['id'],
value=entry)
get_dagster_logger().info(f"Successfully pushed to Kafka {topic}")
@job
def download_hacker_news_job():
metadata = parse_rss_feed(download_rss_feed())
push_to_kafka(metadata, 'news-metadata')
push_to_kafka(download_article_content(metadata), 'news-content')
George Pearse
01/21/2022, 8:45 AMdagster.core.errors.DagsterInvariantViolationError: research_and_development not found at module scope in file repo.py.
Only when my pipeline is triggered by a schedule. It runs fine otherwise. I have absolutely no idea what code change has caused this, it's been working smoothly for months (I am still assuming I've broken something with a code change of course).Dylan Hunt
01/21/2022, 1:37 PMZach
01/21/2022, 6:29 PMNick Dellosa
01/21/2022, 7:43 PMCameron Gallivan
01/21/2022, 9:20 PMtags={
'task1': 'taskname1',
'task2': 'taskname2',
'task3': 'taskname3',
'task4': 'taskname4',
...
'task50': 'taskname50',
}
2. Or concatenate the tasknames into a single string:
tags={
'tasks': 'taskname1,taskname2,taskname3,taskname4,...,taskname50'
}
Charles Leung
01/21/2022, 11:51 PMparenti.daniele
01/22/2022, 12:36 AMread_files
graph in the attached image.
Would anyone be able to help me figure out what's causing the error?
Operation name: PipelineExplorerRootQuery
Message: '_TypeHintInferred[pandas.core.frame.DataFrame]'
Path: ["pipelineSnapshotOrError","solidHandles",0,"solid","definition","outputDefinitions",0,"type"]
Locations: [{"line":243,"column":5}]
Stack Trace:
File "C:\ProgramData\Miniconda3\envs\spence\lib\site-packages\graphql\execution\executor.py", line 452, in resolve_or_error
return executor.execute(resolve_fn, source, info, **args)
File "C:\ProgramData\Miniconda3\envs\spence\lib\site-packages\graphql\execution\executors\sync.py", line 16, in execute
return fn(*args, **kwargs)
File "C:\ProgramData\Miniconda3\envs\spence\lib\site-packages\dagster_graphql\schema\solids.py", line 79, in resolve_type
return to_dagster_type(
File "C:\ProgramData\Miniconda3\envs\spence\lib\site-packages\dagster_graphql\schema\dagster_types.py", line 25, in to_dagster_type
dagster_type_meta = pipeline_snapshot.dagster_type_namespace_snapshot.get_dagster_type_snap(
File "C:\ProgramData\Miniconda3\envs\spence\lib\site-packages\dagster\core\snap\dagster_types.py", line 48, in get_dagster_type_snap
return self.all_dagster_type_snaps_by_key[key]
Aleks
01/24/2022, 6:57 AM