Hebo Yang
12/09/2022, 8:00 PMAlex Kan
12/09/2022, 9:59 PMSteven Litvack-Winkler
12/10/2022, 1:45 AMДаниил Конев
12/10/2022, 5:51 PMRafael Gomes
12/10/2022, 6:54 PMPartition
Assets or Partition
Jobs. What are the best practices ?Mitchell Hynes
12/11/2022, 3:00 AMRafael Gomes
12/11/2022, 6:23 PMMultiPartitionDefinition
? For example,, we do have DailyPartitionsDefinition
-> daily_partitioned_config
I didn't find any multi_partitioned_config
is it something we can achieve with the current multi-partition implementation? I want to create a multi-partitioned job where I can backfill passing color
and date
range for example.geoHeil
12/11/2022, 10:54 PM<http://pd.to|pd.to>_sql
is executed in the noteooks?Daniel Gafni
12/12/2022, 9:18 AMAllPartitionMapping
during upstream backfills.
He doesn't want to perform a full backfill, instead he wants to work with let's say 1 week of partitions from 2 months ago.
Of course, the AllPartitionMapping
doesn't like the fact that not all upstream partitions are materialized.
Meanwhile my colleague wants it to act like "load all partitions that I materialized previously".
I think it should be possible to do with a different partition mapping which would query Dagster for partitions that have ever tried to be materialized (both successfully and not) and then get the latest partition key from this list.
Any tips on how to implement this?
Edit: I realized the downstream asset should probably just be partitioned tooandi.d
12/12/2022, 9:21 AMiskander
12/12/2022, 11:47 AM@io_manager(required_resource_keys={"k8s_pg"})
def my_iomanager(init_context):
return ioClass(
resource=init_context.resources.k8s_pg
)
Where ioClass has init method self.resource=resource
. The problem I'm having is that the resource inside ioClass is initialized without the env vars. However, the hard coded parameters of the resource are there. I observe this by logging from inside the ioClass logging.critical(_self_.resource.__dict__)
, resource params with os.getenv('xxx')
are None. I've also verified that all the required env vars exist inside my user depl container by executing the shell and running printenv
. The code that I have works if I execute from localhost with dagster job execute ....
. I've also tried adding _`executor_def=in_process_executor`_ to my job decorator which didn't change anything. Any help would be appreciated!Vrushank Kenkre
12/12/2022, 12:35 PMstatic_partitioned_config
or dynamic_partitioned_config
, but currently the _run_config_wrapper
for these decorators only accepts partition_key as input. Is there a way to access config_mapping
params in this function, so we can add other config values in run_config_for_partition_fn
in addition to partition_key. If this is not possible, what is the best way to set up the job. Thanks,Olivier Girardot
12/12/2022, 1:09 PMtag_concurrency_limit
available on Runs but for Ops, in order to limit only part of a Run in terms of concurrency (our use case has a DAG with DBT assets (no limit) and Python API calls that would require rate limiting in terms of concurrency) Is there any pointers on how it would be achievable or where to discuss this kind of architecture concerns ?Daniel Galea
12/12/2022, 3:18 PMException ignored in: <generator object build_resources at 0x7f5bfcd93df0>
Traceback (most recent call last):
File ".venv/lib/python3.10/site-packages/dagster/_core/execution/build_resources.py", line 114, in build_resources
list(resources_manager.generate_teardown_events())
File ".venv/lib/python3.10/site-packages/dagster/_utils/__init__.py", line 533, in generate_teardown_events
yield from self.generator
File ".venv/lib/python3.10/site-packages/dagster/_core/execution/resources_init.py", line 279, in resource_initialization_event_generator
yield from manager.generate_teardown_events()
File ".venv/lib/python3.10/site-packages/dagster/_utils/__init__.py", line 533, in generate_teardown_events
yield from self.generator
File ".venv/lib/python3.10/site-packages/dagster/_core/execution/resources_init.py", line 351, in single_resource_event_generator
with user_code_error_boundary(DagsterResourceFunctionError, msg_fn, log_manager=context.log):
File "/usr/lib/python3.10/contextlib.py", line 142, in __exit__
next(self.gen)
File ".venv/lib/python3.10/site-packages/dagster/_core/errors.py", line 184, in user_code_error_boundary
with raise_execution_interrupts():
File "/usr/lib/python3.10/contextlib.py", line 142, in __exit__
next(self.gen)
File ".venv/lib/python3.10/site-packages/dagster/_core/errors.py", line 154, in raise_execution_interrupts
with raise_interrupts_as(DagsterExecutionInterruptedError):
File "/usr/lib/python3.10/contextlib.py", line 142, in __exit__
next(self.gen)
File ".venv/lib/python3.10/site-packages/dagster/_utils/interrupts.py", line 90, in raise_interrupts_as
_replace_interrupt_signal(original_signal_handler)
File ".venv/lib/python3.10/site-packages/dagster/_utils/interrupts.py", line 19, in _replace_interrupt_signal
signal.signal(signal.SIGINT, new_signal_handler)
File "/usr/lib/python3.10/signal.py", line 56, in signal
handler = _signal.signal(_enum_to_int(signalnum), _enum_to_int(handler))
TypeError: signal handler must be signal.SIG_IGN, signal.SIG_DFL, or a callable object
For reference, these are my tests:
I am using pytest and pytest-mock to write my tests.
Test 1:
def test_launch_cluster_cluster_exists(mocker: MockerFixture):
emr_mock = mocker.patch("dagster_aws.emr.emr.EmrJobRunner")
emr_mock.run_job_flow.return_value = "CLUSTER-ID"
emr_mock.cluster_id_from_name.return_value = ""
with mocker.patch('dagster_aws.emr.emr.EmrJobRunner.cluster_id_from_name', side_effect=EmrError()):
context = build_op_context(
resources={
"emr_job_runner": emr_mock
},
config={
"emr_release_label": "emr-6.8.0",
"cluster_name": "TestCluster",
"master_node_instance_type": "m5.xlarge",
"worker_node_instance_type": "m5.xlarge",
"worker_node_instance_count": 2,
"ec2_subnet_id": "subnet-1293819381907",
"worker_node_spot_bid_price": "0.9"
}
)
cluster_id = launch_emr_cluster(context=context)
assert cluster_id == "CLUSTER-ID"
Test 2:
def test_launch_cluster_not_cluster_exists(mocker: MockerFixture):
emr_mock = mocker.patch("dagster_aws.emr.emr.EmrJobRunner")
emr_mock.run_job_flow.return_value = ""
emr_mock.cluster_id_from_name.return_value = "EXISTING-CLUSTER-ID"
context = build_op_context(
resources={
"emr_job_runner": emr_mock
},
config={
"emr_release_label": "emr-6.8.0",
"cluster_name": "TestCluster",
"master_node_instance_type": "m5.xlarge",
"worker_node_instance_type": "m5.xlarge",
"worker_node_instance_count": 2,
"ec2_subnet_id": "subnet-1293819381907",
"worker_node_spot_bid_price": "0.9"
}
)
cluster_id = launch_emr_cluster(context=context)
assert cluster_id == "EXISTING-CLUSTER-ID"
Jeff
12/12/2022, 5:17 PMdagster.materialize
I am getting an error E dagster._check.ParameterCheckError: Param "recon_pipeline" is not a ReconstructablePipeline. Got <dagster._core.definitions.pipeline_base.InMemoryPipeline object at 0x7eff75ff40d0> which is type <class 'dagster._core.definitions.pipeline_base.InMemoryPipeline'>.
I believe this is because I am using pyspark. More details in 🧵Rodrigo Parra
12/12/2022, 5:43 PMERROR:root:b"pid 1790 -> /home/runner/work/trx_etl/trx_etl/build/.pex/venvs/b9b8c4d1ace8e0b01c93d90648332402b22cf90d/0c9ffbfa1b4080a12413e6319eb7d3da7eff4f40/bin/python -sE /home/runner/work/trx_etl/trx_etl/build/.pex/venvs/b9b8c4d1ace8e0b01c93d90648332402b22cf90d/0c9ffbfa1b4080a12413e6319eb7d3da7eff4f40/pex --disable-pip-version-check --no-python-version-warning --exists-action a --no-input --use-deprecated legacy-resolver --isolated -q --cache-dir /home/runner/work/trx_etl/trx_etl/build/.pex/pip_cache download --dest /home/runner/work/trx_etl/trx_etl/build/.pex/downloads/resolver_download.fkkuxf9x/opt.hostedtoolcache.Python.3.8.15.x64.bin.python3.8 --requirement /home/runner/work/trx_etl/trx_etl/build/deps-requirements-18a180dd18384e5a8abcf4448807cd0618dfdca8.txt --index-url <https://pypi.org/simple> --retries 5 --timeout 15 exited with 1 and STDERR:\nERROR: Double requirement given: dagster==1.0.15 (from -r /home/runner/work/trx_etl/trx_etl/build/deps-requirements-18a180dd18384e5a8abcf4448807cd0618dfdca8.txt (line 28)) (already in dagster (from -r /home/runner/work/trx_etl/trx_etl/build/deps-requirements-18a180dd18384e5a8abcf4448807cd0618dfdca8.txt (line 24)), name='dagster')\n\n"
81
My requirements.txt
currently lists:
dagster==1.0.15
dagster-graphql==1.0.15
The project runs well locally. Any hints to what might be going on?Chris Histe
12/12/2022, 6:10 PMcontext
missing from function calls since it’s injected by Dagster.Jeremy
12/12/2022, 7:07 PMsar
12/12/2022, 8:59 PMStreams
in the Airbyte UI :
• Destination Namespace
• Namespace Custom Format
• Destination Stream Prefix
Tried looking through the docs/code but wasn’t very clear to me.Félix Tremblay
12/12/2022, 9:35 PMSensorEvaluationContext
, and I'm confused about the following descriptions:
• cursor (Optional[str]): The cursor, passed back from the last sensor evaluation via the cursor attribute of SkipReason and RunRequest
• last_completion_time (float): DEPRECATED The last time that the sensor was evaluated (UTC).
• last_run_key (str): DEPRECATED The run key of the RunRequest most recently created by this sensor. Use the preferred cursor
attribute instead.
I would like to understand a few things:
Question 1: If last_completion_time
is deprecated, then was is the correct way to get this value? Also, I find that _last_completion_time_ & _last_run_key_ methods are very intuitive and it seems odd that they are being deprecated
Question 2: Using the SensorEvaluationContext, is there a way to inspect (e.g. fetch the status) of the runs that were triggered. I need to know, for example, if other runs are completed or still running.Yang
12/12/2022, 11:54 PMg_pillar_name = SourceAsset(
key=AssetKey("g_pillar_name"),
io_manager_key="pillar_name_io",
metadata={"pillar_name": "governance"})
Then in the io_manager I would reference it
context.metadata.get("pillar_name")
Does this not work? I swear it worked before. Thanks!Selene Hines
12/13/2022, 1:12 AM# in some op
context.log_event(
AssetMaterialization(
asset_key=AssetKey("foo"),
description="Persisted result to storage.",
metadata={
"text_metadata": "Text-based metadata for this event",
}
)
)
# in a sensor (or op execution context)
foo_event_records = context.instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=AssetKey(["foo"]),
),
ascending=False,
)
# get text_metadata from return foo_event_records
When I print out the event records, I can see the stored metadata values for each event record, but I don't want to parse that text, if there's an easy helper function that will get me the metadata values easily?zafar mahmood
12/13/2022, 8:19 AMTimo Klockow
12/13/2022, 8:47 AMstart_date
and end_date
for this schedule?
Mind, I ain’t looking for working with Assets here, I just need this schedule to start executing in the past (with past ticks) as part of a backfill logic/mechanism.
from dagster import DefaultScheduleStatus
from dagster import job
from dagster import RunRequest
from dagster import schedule
from dagster import ScheduleEvaluationContext
@job
def test_job():
op_1()
op_2()
@schedule(
name='test_pipeline',
job=test_job,
cron_schedule='0 3 * * *',
default_status=DefaultScheduleStatus.RUNNING,
)
def scheduled_daily_pipeline(context: ScheduleEvaluationContext) -> RunRequest:
execution_date: datetime = context.scheduled_execution_time
start: datetime = execution_date - timedelta(days=timedelta_days)
end: datetime = execution_date - timedelta(hours=1)
config = {
'config': {
'start_dt': start.strftime('%Y-%m-%d'),
'end_dt': end.strftime('%Y-%m-%d'),
}
}
return RunRequest(
run_key=None,
run_config={'ops': {op_name: config for op_name in [op_1, op_2]}}
)
@repository
def repo():
return [
scheduled_daily_pipeline
]
Jason
12/13/2022, 9:06 AMrun_retries:
enabled: true
max_retries: 3
in the config file? Because I did that and none of my failing jobs are being retried 🤔Archie Kennedy
12/13/2022, 10:07 AMDaniel Galea
12/13/2022, 1:37 PMs3_pickle_io_manager
in a different way other than s3://<bucket>/dagster/storage/<job run id>/<op name>.compute
? I would like to write my files to S3 in the following manner s3://<bucket>/year/month_day/<op name>.compute
.
I guess that the <job run id> is used so that re-running the same Run over and over again will allow any downstream tasks to process the exact same data. This is similar to what I want but year/month_day would be a bit more human readible than a run ID. I am processing my data on EMR and I don't want to couple my Spark code to Dagster. Therefore, a year/month/day partition style would allow Spark to read data independent of Dagster.Gabe Schine
12/13/2022, 3:34 PM@job
other than defining an In(Nothing)
input for the cleanup @op
and passing it the output of the previous @op
?Zachary Bluhm
12/13/2022, 4:20 PMMark Fickett
12/13/2022, 4:57 PM