Elliott Cuvelier
10/25/2022, 4:56 PM.gz
file.
The S3 key is structured as follow {{ env }}/{{ project_id }}/{{ daily_timestamp}}/{{ timestamp }}.{{ some_id }}.gz
We would like to read these file either as “soon” as they arrive or at least once a day.
We then need to separate the track
events from the identify
events and update a users table with the latest & more complete identify
data of the user.
We then need to join the user data to the track
events and push it in a PostgreSQL DB that is linked to our Metabase BI Tool.
My questions are:
1. What do I use to get the latest data from the S3? From what I read and understood I could use the Sensor to ping the S3 each X min to get the latest files (I might have to get the timestamps from the name here but nothing too complicated).
2. What happens next? Do I use a Software Defined asset to query that latest file and append it to some kind of DB? Or do I use an OP for this?
3. When materialising an asset does it only get the newly added data or does it get everything and it’s then up to me to filter out based on the latest data?
I feel like these are stupid questions but I’m still trying to understand how the Software Defined Assets work exactly and how they come into play.
Thanks!Apoorv Yadav
10/25/2022, 5:09 PMCan not build subset plan from unknown steps
.
I am not sure like the same problem was faced by someone else too here and they said it has been fixed but still i am facing the same issue.
Can anyone help me with this problem.Yang
10/25/2022, 7:59 PM@resource()
def mocked_bigquery_extra_primary_fund():
res = MagicMock()
three_funds = pd.DataFrame({
'id': [0,2, 4],
'isin': ["123", "234", None],
'name': ["best fund", "bestest fund", "new primary fund"],
'ric': ["32", "a5", None],
'cusip': ["ss", "bb", None],
'sedol': ["as", "fj", None],
'ticker': ["BF", "BTF", None],
'instrument_perm_id': ["3", "5", "4"],
'fund_shareclass_id': [6, 33, 9],
'fund_primary_shareclass_id': [9, 33, 9],
'lipper_fund_parent_id': ["3422", "999", "444"],
"lipper_fund_asset_universe": ["equity", "cash", "equity"],
"lipper_fund_asset_universe_id": ["23", None, None]
})
res.assign_yb_id.return_value = three_funds
return res
I want to patch another function with a function that will take one argument (not predetermined). how do I do that? thanks!Jing Zhang
10/25/2022, 8:34 PMdagit
and daemon
services using docker-compose and another server just for the user code. Since I'm using the DefaultRunLauncher
, the jobs are executed on the user code server. When a deployment happens on the user code server, all running jobs are interrupted and the job status on the UI gets stuck...
I'm wondering if there is a way to avoid this interruption? Correct me if I'm wrong...I thought about horizontally scaling up the user code server but I guess it won't work because the process running the job is still going to be terminated.
Any suggestions? Maybe I need to deploy dagster differently? Thanks.
scheduler:
module: dagster._core.scheduler
class: DagsterDaemonScheduler
config: {}
run_coordinator:
module: dagster._core.run_coordinator
class: QueuedRunCoordinator
config:
max_concurrent_runs: 25
run_launcher:
module: dagster._core.launcher
class: DefaultRunLauncher
config: {}
Harry Tang
10/25/2022, 8:44 PMexec /opt/conda/envs/user/bin/dagster: argument list too long
. We saw a similar issue in the github in which it’s claimed to be fixed in a release from last month and so we upgraded our dagster to the latest v1.0.13. Still, we are seeing the same issue and our jobs won’t run once the scale becomes large enough. There are more details in that issue link.
Can someone maybe help to take a look? Thanks!Matt Fysh
10/26/2022, 2:31 AMdrogozin
10/26/2022, 8:21 AMdef process_file(file_names:List[string]):
#mask files
gdal_merge("-o", "output.tif", file_names) #the gdal_merge API doesn't saves file behind the scene and doesn't return anything.
Yeachan Park
10/26/2022, 12:09 PMYeachan Park
10/26/2022, 3:14 PMStep subset
to select a specific OP to re-run for that partition. That resulted in the green OP status circles turning grey for all the other successful ops. The only one that's green now is the OP that ran via step subset
.
Would appreciate your help. Thanks!Leo Qin
10/26/2022, 3:25 PMYang
10/26/2022, 3:56 PMBalázs Dukai
10/26/2022, 4:00 PMinternal_asset_deps
argument for multi-asset 'multi_laz_files_ahn3' on key 'laz_files_ahn3'. Each specified asset key must be associated with an input to the asset or produced by this asset. Valid keys: {AssetKey(['pdal_info_ahn3']), AssetKey(['laz_files_ahn3']), AssetKey(['md5_pdok_ahn3'])}`
Here is my asset definition:
@multi_asset(
required_resource_keys={"file_store", "pdal"},
partitions_def=PartitionDefinitionAHN(ahn_version=3),
outs={
"laz_files_ahn3": Out(is_required=False),
"pdal_info_ahn3": Out(is_required=False)
},
internal_asset_deps={
"laz_files_ahn3": {AssetKey(["ahn", "md5_pdok_ahn3"])},
"pdal_info_ahn3": set(),
},
can_subset=True
)
def multi_laz_files_ahn3(context, md5_pdok_ahn3):
# some code
I load my assets from that file like:
ahn_assets = load_assets_from_package_module(
package_module=ahn,
key_prefix="ahn",
group_name=SOURCE
)
Then create a job like this:
job_source_ahn = define_asset_job(
name="source_ahn",
description="Make sure that the available AHN 3,4 LAZ files are present on disk, "
"and their metadata is recorded.",
selection=AssetSelection.keys(["ahn", "pdal_info_ahn3"]) | AssetSelection.keys(["ahn", "laz_files_ahn3"]) | AssetSelection.keys(["ahn", "md5_pdok_ahn3"])
)
Alfonso Cerón
10/26/2022, 4:24 PM@op
def get_dummy_dict():
return {"1": [1, 2], "2": [3, 4]}
@op
def add_one_to_old_list(input_list):
return map(lambda val: val + 1, input_list)
@job
def dummy_job():
dummy_dict = get_dummy_dict()
result = {}
for key, value in dummy_dict:
new_list = add_one_to_old_list(value)
result[key] = new_list
final_op(result)
@op
def final_op(result):
print(result)
Dusty Shapiro
10/26/2022, 5:25 PMYang
10/26/2022, 5:50 PMyield Output(
updated_holdings_df_dd
), Output(
updated_funds_df_dd,
I got this error for yielding them
yielding outputs from a op generator, they should be wrapped in an `Output` object.
Yang
10/26/2022, 6:03 PMcontext.partition_key
?David Merritt
10/26/2022, 7:55 PM@asset
wrapped fn within another @job
fn and expected that would count as a materialization. I don’t see any record of the materialization of that asset when launched from the job, only if I manually materialize it in dagit. Is this expected, is there some other way I should be approach this? [download external resource] -> [process] -> [update external resource]
is the workflow and I thought seeing the materialization records for the [download external resource] step would be helpful outside of the job.Andrei Molev
10/26/2022, 9:16 PMK8sRunLauncher
. We use K8sRunLauncher
to run jobs in Kubernetes cluster. However Dagit, Dagster Daemon, and User Repository are deployed outside of Kubernetes cluster. How can I configure run_launcher
on Dagster Daemon to use kubeconfig
file (to reach out to Kubernetes) and on Run worker job (inside Kubernetes) to load incluster config?
It seems like Run worker uses configuration provided in Pod’s container command (dagster api execute_run ...
) and ignores configuration in dagster.yaml
inside Run worker image.
dagster.yaml
file of Dagster Daemon (configured to use kubeconfig
file):
run_launcher:
module: dagster_k8s.launcher
class: K8sRunLauncher
config:
service_account_name: dagster
job_image: "dagster-job"
dagster_home: "/opt/dagster/dagster_home"
image_pull_policy: "IfNotPresent"
job_namespace: dagster
instance_config_map: "dagster-instance"
env_secrets:
- "database-secret"
load_incluster_config: false
kubeconfig_file: "./kubeconfig"
dagster.yaml
file of Run worker running in Kubernetes (configured to load incluster config)
run_launcher:
module: dagster_k8s.launcher
class: K8sRunLauncher
config:
service_account_name: dagster
job_image: "dagster-job"
dagster_home: "/opt/dagster/dagster_home"
image_pull_policy: "IfNotPresent"
job_namespace: dagster
instance_config_map: "dagster-instance"
env_secrets:
- "database-secret"
load_incluster_config: true
I can’t override load_incluster_config
parameter with environment variable, since it only accepts boolean
value. I tried to use instance_config_map
parameter, providing ConfigMap with run_launcher
configuration - with no success.
How can I override load_incluster_config
and kubeconfig_file
parameters in Run worker?Jose Uribe
10/26/2022, 10:13 PMdagster
dagit==0.14.2
dagster-aws==0.14.2
dagster-postgres==0.14.2
dagster-slack==0.14.12
Dagster Jarred
10/26/2022, 11:39 PMFrank Lu
10/26/2022, 11:54 PMfs_io_manager
. Is there a generic way to do this so it’ll work for all other managers such as 3_pickle_io_manager
??
@op(required_resource_keys={'snowflake', 'io_manager'})
def query_vessel_readings(context: OpExecutionContext):
res = context.resources.snowflake.execute_query(f"SELECT * FROM {tables[0]} LIMIT 1;", use_pandas_result=True)
base_dir = context.resources.io_manager.base_dir
op_name = context.op_def.name
run_id = context.run_id
storage_path = os.path.join(base_dir,run_id,op_name, "result")
context.log_event(
AssetMaterialization(
asset_key="upstream_vessel_readings",
description=f"Persisted result to storage.",
metadata = {
"text_metadata": "Text-based metadata for this event",
"path": MetadataValue.path(storage_path),
"size (bytes)": <http://MetadataValue.int|MetadataValue.int>(res.memory_usage().sum().item())
}
)
)
return res
for instance, if i’m using fs_io_manager
, i want to be able to see the path as ~/.dagster
and if i’m using s3_pickle_io_manager
, i want to see the bucket and the prefix in the metadata in log_event.Rafael Gomes
10/27/2022, 1:28 AMio_manager_key
and metadata
after creating an asset from graph.
Example:
my_asset = AssetsDefinition.from_graph(my_graph)
How do I set io_manager_key
and metadata
on my_asset
?Romain
10/27/2022, 8:40 AMAverell
10/27/2022, 12:41 PMstart_dt, end_dt = context.asset_partitions_time_window_for_output
actual_start_dt = date_trunc('month', start_dt)
handle_time_range(actual_start_dt, end_dt)
With this, I’ll see gaps in Dagit’s partitions screen.
Is there any ready-made solution for this one? If not, what’s your suggestion?
Thanks!Pragna
10/27/2022, 1:26 PMRuntimeError: Java gateway process exited before sending its port number
- to overcome that I installed Java on my container.
Now when I am trying to run it - I am getting an another error for pyspark_step_launcher
Traceback (most recent call last):
File "/mnt/tmp/spark-98d604fc-9105-4498-86f1-2f673897a704/emr_step_main.py", line 93, in <module>
main(sys.argv[1], sys.argv[2])
File "/mnt/tmp/spark-98d604fc-9105-4498-86f1-2f673897a704/emr_step_main.py", line 28, in main
step_run_ref = pickle.loads(step_run_ref_data)
TypeError: __new__() takes from 2 to 17 positional arguments but 18 were given
I am trying to use example from dagster github https://github.com/dagster-io/dagster/blob/master/examples/with_pyspark_emr/with_pyspark_emr/repository.py
Just FYI - My dagster is running on ECS.
Is there is anything very known which I am missing or does anyone know the work around for this? Thanks!stefan hansan
10/27/2022, 2:47 PMMaksym Domariev
10/27/2022, 2:57 PMZach P
10/27/2022, 4:17 PMBen Bay
10/27/2022, 5:01 PM0.16.14
. Will they play nice with Dagster 1.0?
dagster-gcp
dagster-k8s
dagster-pagerduty
dagster-pandas
dagster-postgres
dagster-shell
dagster-slack
Thanks!Yang
10/27/2022, 7:52 PM