Mike Atlas
10/21/2022, 8:44 PMget_dagster_logger()
log in a sensor
evaluation, where would I find the log entries?
eg
@sensor(job=my_job)
def my_job_sensor(context):
log = get_dagster_logger()
<http://log.info|log.info>("sensor initiated...")
CJ
10/21/2022, 9:52 PM# Create a graph
@graph
Def pipeline():
Output = my_ops()
my_ops_result.with_hooks({slack_message_on_success, slack_connection})()
# Convert to job
run = pipeline.to_job(resource_defs={'aws_session': aws_session,
'db_connection': db_connection,
'slack_connection': slack_connection
},
hooks={slack_message_on_failure},
name='pipeline_{0}'.format(env),
config=config
)
Result: DagsterInvariantViolationError
Tyrone Wong
10/21/2022, 10:04 PMBen Andersen-Waine
10/21/2022, 10:52 PM@io_manager(required_resource_keys={"pyspark"},
config_schema={"path_prefix": str, "stage_prefix": str, "build_number": str})
def parquet_io_manager():
return ParquetIOManager()
Creating the resource:
@repository
def repository():
return [
with_resources(
all_assets,
{ "bronze_parquet_io_manager": parquet_io_manager.configured({"path_prefix": "<s3://817187305846-dagster-test>", "stage_prefix": "bronze"}) }
),
Note - I'm omitting some other resource config for brevity but intentionally also omitting the "build_number" config as I want this to be populated at runtime.
Then supplying the following at launch via dagit:
resources:
bronze_parquet_io_manager:
config:
build-number: "build-600"
Is supplying config to IOManagers like this supported?Slackbot
10/21/2022, 11:09 PMRean
10/22/2022, 8:13 AMlogs
folder for every runtime?Arthur
10/22/2022, 11:47 AMArthur
10/22/2022, 11:47 AMArthur
10/22/2022, 12:21 PMArthur
10/22/2022, 12:21 PMZac Chien
10/23/2022, 3:26 PMKyle Gobel
10/24/2022, 4:06 AMop
based on the output of a different op
from dagster import op, graph
from dagster_k8s import k8s_job_op
@op
def get_greeting():
return "hello"
@graph
def my_graph():
greeting = get_greeting()
greeter = k8s_job_op.configured(
{
"image": "busybox",
"command": ["/bin/sh", "-c"],
"args": ["echo " + greeting], #obv this doesn't work, but not sure what to do
"load_incluster_config": False,
"namespace": "default"
},
name="say_greeting"
)
greeter()
Yang
10/24/2022, 5:44 AMTypeError: int() argument must be a string, a bytes-like object or a number, not 'NAType'
File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 47, in solid_execution_error_boundary
yield
File "/usr/local/lib/python3.9/site-packages/dagster/_utils/__init__.py", line 430, in iterate_with_context
next_output = next(iterator)
File "/usr/local/lib/python3.9/site-packages/dagster/_core/execution/plan/compute_generator.py", line 74, in _coerce_solid_compute_fn_to_iterator
result = fn(context, **kwargs) if context_arg_provided else fn(**kwargs)
File "/expo-analytics/secmaster/lipper/assets/fund_updates.py", line 31, in new_and_existing_fund_instruments
existing_funds = bq_lipper.check_if_fund_in_yb_instrument_universe(
File "/expo-analytics/secmaster/lipper/resources/bigquery_lipper.py", line 392, in check_if_fund_in_yb_instrument_universe
all_matches_perm_id_yb_id = pd.merge(
File "/usr/local/lib/python3.9/site-packages/pandas/core/reshape/merge.py", line 121, in merge
return op.get_result()
File "/usr/local/lib/python3.9/site-packages/pandas/core/reshape/merge.py", line 715, in get_result
join_index, left_indexer, right_indexer = self._get_join_info()
File "/usr/local/lib/python3.9/site-packages/pandas/core/reshape/merge.py", line 966, in _get_join_info
(left_indexer, right_indexer) = self._get_join_indexers()
File "/usr/local/lib/python3.9/site-packages/pandas/core/reshape/merge.py", line 940, in _get_join_indexers
return get_join_indexers(
File "/usr/local/lib/python3.9/site-packages/pandas/core/reshape/merge.py", line 1481, in get_join_indexers
zipped = zip(*mapped)
File "/usr/local/lib/python3.9/site-packages/pandas/core/reshape/merge.py", line 1478, in <genexpr>
_factorize_keys(left_keys[n], right_keys[n], sort=sort, how=how)
File "/usr/local/lib/python3.9/site-packages/pandas/core/reshape/merge.py", line 2165, in _factorize_keys
rk = ensure_int64(np.asarray(rk))
File "pandas/_libs/algos_common_helper.pxi", line 93, in pandas._libs.algos.ensure_int64
Oliver
10/24/2022, 6:27 AMcsv.gz
files from the net to a bucket, b) run a create external table sql statement to register the table.
Then DBT takes over and consumes the csv.gz files and partions/buckets and transforms to parquet.
with my current setup I have all the raw tables defined as dbt sources and do a simple select * from table
with some extra config to handle bucketing/partitioning/parqueting.
Unforunately when I try to load the project in dagster now DBT is looking at the sources, attempting to retrieve schema and then failing during repo init. Is there a pattern I could use here that could help here?
failure picturedAlbert
10/24/2022, 2:18 PMbuild_schedule_from_partitioned_job
, it is a monthly schedule because it’s a monthly partition.
When I try to do something like:
my_month_partitioned_job = define_asset_job(name="job_name", selection="my_monthly_partitioned_asset", partitions_def=monthly_partition_def)
my_schedule = ScheduleDefinition(job=my_month_partitioned_job, cron_schedule="21 16 * * *")
I get an error like:
dagster._check.CheckError: Invariant failed. Description: Tried to access partition_key for a non-partitioned run
If I leave out the job partition definition:
my_month_partitioned_job = define_asset_job(name="job_name", selection="my_monthly_partitioned_asset")
my_schedule = ScheduleDefinition(job=my_month_partitioned_job, cron_schedule="21 16 * * *")
I get the same error as above:
dagster._check.CheckError: Invariant failed. Description: Tried to access partition_key for a non-partitioned run
Part of whats going on might be that my monthly partitioned asset is accessing the context to do what it needs to do:
@asset(partitions_def=monthly_partition_def)
def my_monthly_partitioned_asset(context):
partition_key = context.asset_partition_key_for_output()
... use partition key to understand what month partition it is...
return stuff
stack trace looks like this:
Stack Trace:
File "lib/python3.9/site-packages/dagster/_core/execution/plan/utils.py", line 47, in solid_execution_error_boundary
yield
File "lib/python3.9/site-packages/dagster/_utils/__init__.py", line 430, in iterate_with_context
next_output = next(iterator)
File "lib/python3.9/site-packages/dagster/_core/execution/plan/compute_generator.py", line 73, in _coerce_solid_compute_fn_to_iterator
result = fn(context, **kwargs) if context_arg_provided else fn(**kwargs)
File "assets.py", line 39, in my_monthly_partitioned_asset
<http://context.log.info|context.log.info>(context.asset_partition_key_for_output())
File "lib/python3.9/site-packages/dagster/_core/execution/context/compute.py", line 370, in asset_partition_key_for_output
return self._step_execution_context.asset_partition_key_for_output(output_name)
File "lib/python3.9/site-packages/dagster/_core/execution/context/system.py", line 811, in asset_partition_key_for_output
start, end = self.asset_partition_key_range_for_output(output_name)
File "lib/python3.9/site-packages/dagster/_core/execution/context/system.py", line 806, in asset_partition_key_range_for_output
return PartitionKeyRange(self.partition_key, self.partition_key)
File "lib/python3.9/site-packages/dagster/_core/execution/context/system.py", line 334, in partition_key
check.invariant(
File "lib/python3.9/site-packages/dagster/_check/__init__.py", line 1470, in invariant
raise CheckError(f"Invariant failed. Description: {desc}")
Christopher Avila
10/24/2022, 3:32 PMHarry James
10/24/2022, 4:40 PMDusty Shapiro
10/24/2022, 4:42 PMlocalhost
as the host, I’m using airbyte-airbyte-server-svc
It definitely is able to connect, but doesn’t seem to be pulling assets.Gustavo Carvalho
10/24/2022, 7:07 PMkey
and "name" values? I want to use the key
to refer to the asset programatically (for example, in yielding `AssetMaterialization`s) and the "name" to be a nice, meaningful name to be displayed on the UIMarc Keeling
10/24/2022, 7:47 PMOliver
10/25/2022, 1:29 AMAssetsDefinition.from_graph
?Jesper Bagge
10/25/2022, 6:55 AMMykola Palamarchuk
10/25/2022, 7:35 AMdynamic_int_sequence().map(twice).collect()
that returns list like [2,4,6,...]
• I use mem_io_manager
and in_process_executor
, running on my local machine (enough memory, good cpu and fast ssd)
• for 1000 elements pipeline takes ~*100 sec* to complete, generating 10k events
• dynamic_int_sequence
which just returns 1000 consecutive DynamicOutput integers takes 12 sec
This demonstrates huge overhead on relatively small amount of simple operations.
So I have a question: is it convenient to use DynamicOutput in such cases?Rainer Pichler
10/25/2022, 7:45 AMRoel Hogervorst
10/25/2022, 9:29 AMVlad Efanov
10/25/2022, 9:49 AMGustavo Carvalho
10/25/2022, 11:52 AMIOManager.load_input
of B and C sequentially or in parallel?
Also, a more specific use-case: If I have Assets A and B which are both subset of rows in a database table, how can I setup an IOManager.load_input
to load both using the same query and then splitting the results and redirecting then to each Asset?Deo
10/25/2022, 12:26 PMaggregate
the partitioned asset
2. Have an asset w/ partition downstream of an asset w/ partition but would like to receive the data rolling
, wouldn't map 1 to 1 but rolling n to 1
I believe this could be accomplished by PartitionDefinition but having trouble finding how.Deepa Vasant
10/25/2022, 2:25 PMIsmael Rodrigues
10/25/2022, 3:02 PM