geoHeil
03/01/2022, 2:57 PMop
and asset
? Right now I get a: DagsterInvalidConfigError: Error in config for job
Error 1: Missing required config entry "dummy_asset_partitioned" at path root:ops. Sample config for missing entry: {'dummy_asset_partitioned': {'config': {'date': '...'}}}
as Dagster is feedng the configuration per-op and somehow is only filling the partition date into a single of the two stepsMax
03/01/2022, 3:08 PMAnoop Sharma
03/01/2022, 4:02 PMLouis Auneau
03/01/2022, 4:33 PMGOOGLE_APPLICATION_CREDENTIALS
for a sensor logic (that pulls data from GCP Pub/Sub) on an Helm deployment. We are stuck on this because it seems that we need to mount a volume based on a secret onto the daemon, which is not possible given the current values.yaml
file.
Do you know if this is possible, or is there any other way to do this ?
Thank you by advance and have a nice day 🙂 !geoHeil
03/01/2022, 5:26 PMclass PandasCsvIOManagerWithOutputAssetPartitions(IOManager):
def load_input(self, context):
file_path = os.path.join("my_base_dir", context.step_key, context.name)
return pd.read_csv(file_path)
def handle_output(self, context, obj):
file_path = os.path.join("my_base_dir", context.step_key, context.name)
obj.to_csv(file_path, index=False)
yield <http://MetadataEntry.int|MetadataEntry.int>(obj.shape[0], label="number of rows")
yield MetadataEntry.float(0.1234, "some_column mean")
def get_output_asset_key(self, context):
file_path = os.path.join("my_base_dir", context.step_key, context.name)
#return AssetKey(file_path)
return file_path
def get_output_asset_partitions(self, context):
return set(context.config["partitions"])
@asset(partitions_def=DailyPartitionsDefinition(start_date="2020-02-01"))
def dummy_asset_partitioned(context) -> DataFrame:
"""Creates a mini dummy asset which is partitioned"""
partition_key = context.output_asset_partition_key
get_dagster_logger().info(f"Partitioned asset from: {partition_key}")
df = pd.DataFrame({'foo':[1,3,3], 'bar':['a', 'b', 'c']})
df['partition_key'] = partition_key
rand_metric_dummy_value = random.randrange(0, 101, 2)
yield Output(df, metadata={
"path": EventMetadata.path('/path/to/file'),
"value_counts": 10,
"random_dummy_metric": rand_metric_dummy_value
})
How can I use/set the asset_key
only once? This is currently very unclear to me.
dagster.core.errors.DagsterInvariantViolationError: Both the OutputDefinition and the IOManager of output "result" on solid "dummy_asset_partitioned" associate it with an asset. Either remove the asset_key parameter on the OutputDefinition or use an IOManager that does not specify an AssetKey in its get_output_asset_key() function.
Chris Nogradi
03/01/2022, 6:27 PMrowan gaffney
03/01/2022, 11:27 PMBen Gatewood
03/02/2022, 5:47 AMgeoHeil
03/02/2022, 8:54 AMyield Output(df, metadata={
"path": EventMetadata.path('/path/to/file'),
"value_counts": 10,
"random_dummy_metric": rand_metric_dummy_value
})
If I understand this correctly it would:
- store the emitted metadata
- materialize the data using an IO manager
What happens if the materialization fails (for whatever reason). Would I still get the metadata to show up in dagit?geoHeil
03/02/2022, 9:48 AMThe op-decorated function accepts DataFrames as parameters and returns DataFrames when it completes. An IOManager handles writing and reading the DataFrames to and from persistent storage.Assuming the DF is petabytes in size I do not neccessarily want to materialize all this IO. Spark itself will create a DAG of the submitted operations - and perhaps calculate additional predicate pushdowns or projections for optimization (AQE). How can I use dagster and ops to define multiple (reusable building blocks) but still not materialize the IO between these steps?
Mark Fickett
03/02/2022, 2:31 PMvalues_resource
with an Optional
List
of `Enum`s?
Here's what I tried:
from typing import List
from typing import Optional
from dagster import make_values_resource
from dagster import Enum
from common_definitions import PipelineStage, Pipe # regular Python enums
# Define a resource for a config that can be passed to multiple ops in a job.
# <https://docs.dagster.io/concepts/configuration/config-schema#passing-configuration-to-multiple-ops-in-a-job>
SHARED_CONFIG = make_values_resource(
from_stage=Enum.from_python_enum(PipelineStage), # seems to work
pipes=Optional[List[Enum.from_python_enum(Pipe)]], # error
)
The error I get is:
/...packages/dagster/core/workspace/context.py:541: UserWarning: Error loading repository location my_pipeline.py:TypeError: Parameters to generic types must be types. Got <dagster.config.config_type.Enum object at 0x7f54f51247c0>.
I haven't done much debugging yet, but thought I'd ask for a definitive answer while I experiment. This is my first time trying to declare a values_resource
.Mark Fickett
03/02/2022, 2:57 PM@op(
out={
"do_metadata": Out(Nothing, is_required=False),
"no_metadata": Out(Nothing, is_required=False),
},
)
def _decide_whether_to_do_metadata(context):
config = context.resources.shared_config
if config.should_do_metadata:
yield Output(None, "do_metadata")
yield Output(None, "no_metadata")
@op(
ins={"start": In(Nothing)},
)
def _metadata(context):
pass # only do this work if the config said so
@op(
ins={"start": In(List[Nothing])},
)
def _continue_either_way(context) -> None:
pass # always do this work, but wait for metadata to be done
@graph
def _graph():
do_metadata, no_metadata = _decide_whether_to_do_metadata()
done_metadata = _metadata(do_metadata)
_continue_either_way(start=[no_metadata, done_metadata])
I could just put an early-exit in the _metadata
@op
, but it would be nice to see at a glance in Dagit whether the @op
was skipped or not.Chen Tsinovoy
03/02/2022, 4:41 PMAnoop Sharma
03/02/2022, 8:24 PMDaniel Katz
03/02/2022, 9:10 PMDaniel Katz
03/02/2022, 9:10 PMdagster.check.CheckError: Failure condition: Can not have pending and unresolved step inputs
Mark Fickett
03/02/2022, 9:48 PMpython_logs: managed_python_loggers: - root
in my $DAGSTER_HOME/dagster.yaml
. (I confirmed that a logger in my main process does get picked up within Dagit, and the logs from the subprocess do still show in up in my terminal where I started dagit
.)
I'm hoping to do an incremental migration where I leave some chunks using concurrent.futures for a bit, but if the answer is that I just need to get everything into Dagster, that's probably OK too.Stephen Bailey
03/03/2022, 2:04 AMschedules:
- cron: 0 1 * * *
extractor: tap-gitlab
loader: target-snowflake
- cron: 0 2 * * *
extractor: tap-dbt-cloud
loader: target-snowflake
- cron: 0 3 * * *
extractor: tap-salesforce
loader: target-snowflake
...
could some one give me some pointers on how i can do this?Ben Gatewood
03/03/2022, 5:04 AMcontext.resources.s3.upload_file_obj(f_obj, "testing", filename)
Ben Gatewood
03/03/2022, 5:05 AM'S3' object has no attribute 'upload_file_obj'
Ben Gatewood
03/03/2022, 5:05 AM@op(required_resource_keys={'s3'})
Ben Gatewood
03/03/2022, 5:06 AMresource_defs={'s3': s3_resource}
Ben Gatewood
03/03/2022, 5:06 AMupload_file_obj()
instead of list keys or whichever)Sathish
03/03/2022, 5:08 AMFrancois-DE
03/03/2022, 5:38 AMSanat Mouli
03/03/2022, 7:36 AM@schedule(
cron_schedule="45 6 * * *",
job=hello_cereal_job,
execution_timezone="US/Central",
)
Martin Laurent
03/03/2022, 3:26 PM.map(subgraph)
but I get this error:
Unexpected dynamic output dependency in regular fan in, should have been caught at definition time.
Mark Fickett
03/03/2022, 3:30 PMDagsterInstance
but it looks like it's for internal use. What I'm doing now is providing a wrapper like this:
def run_pipeline(...):
with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml") as temp_config_file:
shared_config = {
"a_config_option": x,
"another_config_option": y,
}
yaml.dump(
{"resources": {"shared_config": {"config": shared_config}}},
temp_config_file,
)
temp_config_file.flush()
subprocess.check_call([
"dagster",
"job",
"execute",
"-m", "my.dagster_repo.module",
"-j", "my_job_name",
"-c", temp_config_file.name,
])
It would be more convenient to just call this directly in Python. Ideally including dagster.yaml
settings too would be great, so the wrapper script can set some of those options.Rohan Kshirsagar
03/03/2022, 4:51 PMgeoHeil
03/03/2022, 4:51 PMAIRBYTE_CONNECTION_ID = "your_airbyte_connection_id"
and get any connectors configured there (in the UI) / API to E2E follow along with the example