Sanidhya Singh
02/14/2023, 4:12 PMAlexey Kulakov
02/14/2023, 4:25 PMparts = MonthlyPartitionsDefinition(start_date='2022-01-01')
@asset(
required_resource_keys={"spark"},
io_manager_key="io_spark_iceberg_parts",
partitions_def = parts
)
def parts_proc(context, parts_source):
parts_proc = some_func(parts_source)
return parts_proc
2. I have an asset job defined as:
assets_for_job = ['parts_proc']
asset_job = define_asset_job(
name='asset_job',
selection=assets_for_job
)
3. I have some definition to start job from external jupyter:
from dagster_graphql import DagsterGraphQLClient
client = DagsterGraphQLClient("dagster", port_number=3070)
client.submit_job_execution(
job_name="asset_job",
repository_name="my_repo"
)
How could I set the list of asset’s partitions that I need to materialize in “submit_job_execution” at the step 3?Ripple Khera
02/14/2023, 4:26 PMIgnoring a duplicate run that was started from somewhere other than the run monitor daemon
gets logged in dagit at 03:23
• step pod runs for 11 hrs and terminates at 05:42
• next step pod is not spun up
ive been told dagster support is pretty responsive, and this is becoming quite a headache for us, so any pointers would be much appreciated.Xiaotian Yu
02/14/2023, 4:33 PMcontext.metadata
.
from the debug info in pic 3 you can see: the asset key is present, but there is no metadata (which shoud exist I think?)
for github issues, I only searched and found https://github.com/dagster-io/dagster/pull/6900, so I think my usage is valid, is that true? Thanks!Xiaotian Yu
02/14/2023, 4:38 PMclay
02/14/2023, 5:00 PMactivity_analytics
, core
, and recommender
.
In my case, let's say I have package1
and package2
. As with the example, all_assets
ends up being [*package1_assets, *package2_assets]
and is passed to Definitions
in the same way that the assets are in the linked example. If I go into dagit and launch a run to materialize all of the package1
and package2
assets, which are in different groups, all works fine.
However, if I modify the code to package2
and then update the git repository and restart dagster/dagit, the system no longer knows that the package1
assets were materialized, in spite of the fact that package1
did not change at all. I can understand that package2
would reset, however, I'd like to figure out how to set this up so that only the assets associated with the package that changed would need to be rematerialized for dagster to know about them.
Most of my work/assets is being pushed to Snowflake. Is this simply a matter of dagster not knowing how to find them on Snowflake? Or will all of the assets associated with all of the packages always reset when I push a code change to a single package?Ben Wilson
02/14/2023, 7:27 PMimport dagster
from dags import cf, observations, ts, pr
from models import database
defs = dagster.Definitions(
jobs=[
observations.import_observations.to_job(resource_defs={"database": database}),
cf.copy.to_job(),
ts.ts.to_job(resource_defs={"database": database}),
pr.ss.to_job(resource_defs={"database": database})
],
resources={"database": database},
)
Spencer Nelson
02/14/2023, 9:31 PMDaniel Chalef
02/14/2023, 10:36 PMexecute_k8s_job
. The op is called via map
of a `DynamicOutput`list. execute_k8s_job
uses the name from the OpExecutionContext
as the k8s job name. However, it doesn’t appear that the op’s name is actually unique for each invocation of the op in the map operation, which results in dagster attempting to create k8s jobs with duplicate names.
I’ve considered using a factory that generates ops with unique names, but am unsure how I’d use this with map
.
How do I ensure that each mapped invocation of an op has a unique name?Yuan Cheng
02/14/2023, 11:31 PMLeigh Stoller
02/15/2023, 1:47 AMJohn Cenzano-Fong
02/15/2023, 8:06 AMdepends_on_past
so you can prevent a job schedule from executing if the last run of the same job failed. Is there an analog to this for Dagster? I'm seeing execution_fn and should_execute that leverage ScheduleEvaluationContext
but that context seems to only have information about the current run. Any best practice suggestions for this situation?Sundara Moorthy
02/15/2023, 11:23 AMFATAL: "/bitnami/postgresql/data" is not a valid data directory
DETAIL: File "/bitnami/postgresql/data/PG_VERSION" does not contain valid data.
HINT: You might need to initdb.
chmod: changing permissions of '/var/run/postgresql': Operation not permitted
Ohad Basan
02/15/2023, 12:18 PMSandro Vieira de Paula
02/15/2023, 2:13 PMclay
02/15/2023, 3:41 PMMyDataFrame = create_dagster_pandas_dataframe_type(...)
And an asset that produces that type, for storage in Snowflake. This all works as expected. The asset signature is like:
@asset
def my_asset(context: OpExecutionContext) -> MyDataFrame:
...
return df
Validation passes, etc.
Downstream, I have an asset that needs to understand the types specified for columns in MyDataFrame
. I tried to do this for the downstream asset's signature:
@asset
def my_downstream_asset(context: OpExecutionContext, my_asset: MyDataFrame) -> None:
...
# some stuff happens
...
return None
However, I cannot materialize my_downstream_asset
because of the error:
dagster._check.CheckError: SnowflakeIOManager does not have a handler for type 'typing.Any'. Has handlers for types '<class 'pandas.core.frame.DataFrame'>'. Please add <class 'pandas.core.frame.DataFrame'> type hints to your assets and ops.
I had this same error yesterday when I did not specify a type at all for my_asset
in the signature of my_downstream_asset
.
How can I have my_downstream_asset
load my_asset
and apply the column types specified (and validated) in MyDataFrame
?clay
02/15/2023, 3:42 PMSnowflakeIOManager
ignoring the MyDataFrame
type hint?clay
02/15/2023, 3:43 PM@asset()
somehow?
I tried adding the following to @asset
and had the same error:
ins={"my_asset": AssetIn("my_asset", dagster_type=MyDataFrame)}
The only way I'm able to load my_asset
in my_downstream_asset
is with the signature:
def my_downstream_asset(context: OpExecutionContext, my_asset: pd.DataFrame) -> None:
....
... or, is the metadata about the MyDataFrame type stored in the dagster postgres db so that the types automatically are applied? 🤔Robert Wade
02/15/2023, 3:44 PMRobert Lawson
02/15/2023, 4:04 PMGreg Burd
02/15/2023, 4:41 PMbranch_developments.yml
process. I'm running Cloud/Hybrid.
• What is my DAGSTER_CLOUD_URL
• The DAGSTER_CLOUD_API_TOKEN
is a "user" token from the GUI?
• My ORGANIZATION_ID
is klar
I figured that out (ha!)Alastair James
02/15/2023, 5:15 PMdagster.core.execution.api.create_execution_plan
function? Reason being some assets i.e. data from APIs, can't be materialised outside production.Navneet Sajwan
02/15/2023, 5:38 PMChris Evans
02/15/2023, 6:37 PMSorry, page can't be displayed.
Please report this error to the Dagster team via GitHub or Slack. Refresh the page to try again.
Loading CSS chunk 710 failed.
(<http://localhost:3001/static/css/710.59a51a08.chunk.css>)
Error: Loading CSS chunk 710 failed.
(<http://localhost:3001/static/css/710.59a51a08.chunk.css>)
at o.onerror.o.onload (<http://localhost:3001/static/js/main.9a8fb596.js:2:1818747>)
Jun Ying
02/15/2023, 7:32 PMAlex Kan
02/15/2023, 8:43 PMZach
02/15/2023, 10:45 PMcontext.asset_partition_key_range_for_output
/ context.asset_partition_time_window_for_output
to enable a partitioned asset to operate on a range of partitions? I'm currently using context.partition_time_window
and context.asset_partitions_time_window
and am a bit unsure of how to modify my code to use this featureWill Tyree
02/15/2023, 11:16 PMTypeError: SqlEventLogStorage._get_asset_entry_values() missing 1 required positional argument: 'has_asset_key_index_cols'
Rahul Dave
02/16/2023, 3:22 AMRahul Dave
02/16/2023, 3:32 AM