Utkarsh
08/31/2021, 1:30 PMSundara Moorthy
08/31/2021, 3:58 PMchrispc
08/31/2021, 4:18 PMTypeError: Object of type Timestamp is not JSON serializable or TypeError: Object of type date is not JSON serializable
. I know that json.dump() has the parameter default (default
is a function applied to objects that aren't serializable.
In this case it's str
, so it just converts everything it doesn't know to strings) but I can't use it. If I use json.dump before I got the data in the assets catalog uglyJake Smart
08/31/2021, 10:50 PMchrispc
08/31/2021, 11:01 PMchrispc
08/31/2021, 11:52 PMSandeep Aggarwal
09/01/2021, 4:06 AMSlackbot
09/01/2021, 6:46 AMJulian Ganguli
09/01/2021, 1:17 PMPeter Bower
09/02/2021, 12:38 AMJay Sharma
09/02/2021, 4:09 AMPeter Bower
09/02/2021, 4:13 AMNilesh Pandey
09/02/2021, 5:23 AMdatabricks_pyspark_step_launcher
link :- https://docs.dagster.io/_apidocs/libraries/dagster-databricks#dagster_databricks.databricks_pyspark_step_launcher
According to the documentation, launcher will help me in satisfying the use case, but I didn't found any example of how to use this step launcher.
I did found one for EMR step launcher. I just need an example of Databricks oneNilesh Pandey
09/02/2021, 10:33 AMsourabh upadhye
09/02/2021, 10:48 AMqueryArgs = f"""{{
selector:{{
repositoryName: "{repo_name}"
repositoryLocationName: "{repo_loc}"
pipelineName:"{pipeline_name}"}}
runConfigData:{run_config}
mode:"{mode_def}"
}}"""
pipeline_execute = f"""mutation {{
launchPipelineExecution(executionParams:{queryArgs})
{{
__typename
... on PipelineConfigValidationInvalid{{
errors{{
message
}}
}}
}}
}}"""
print(pipeline_execute)
resp = json.loads(client.execute(pipeline_execute))
sourabh upadhye
09/02/2021, 10:48 AMrun_config = f"""{{
num: 10,
dict: {{hello:"Whats up"}},
string: "Hello"}}"""
Basically can we allow double quotes around the keys. Currently if the keys have double quotes the query fails.
Our use case, we are programmatically executing pipelines. We receive the run_config from the network. On deserializing the dictionary, we have double quotes around the dictionary keys which is rejected by dagster graphql.
We could get nested run configs and hence would prefer graphql server to accept the run_config keys with double quotes, rather than we try to remove them across the nested config object.Matthias Queitsch
09/02/2021, 10:59 AMNathan Saccon
09/02/2021, 12:52 PMAlejandro A
09/03/2021, 1:16 AM@solid
def hello():
return 1
@solid(
input_defs=[
InputDefinition("name", str),
InputDefinition("age", int),
]
)
def hello2(hello_: int, name: str, age: int):
print(f"Hello {name}, {age+hello_}")
@pipeline
def pipe_2():
x = hello()
y = hello2(x, "Marcus", 20)
@repository
def deploy_docker_repository():
return [pipe_2, my_schedule]
Alejandro A
09/03/2021, 1:18 AMdagster.core.errors.DagsterInvalidDefinitionError: In @pipeline pipe_2, received invalid type <class 'str'> for input "name" (at position 1) in solid invocation "hello2". Must pass the output from previous solid invocations or inputs to the composition function as inputs when invoking solids during composition.
Drew Sonne
09/03/2021, 11:46 AMgrpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: status = StatusCode.UNAVAILABLE details = "failed to connect to all addresses" debug_error_string = "{"created":"@1630669409.194800714","description":"Failed to pick subchannel","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":3008,"referenced_errors":[{"created":"@1630669409.194797596","description":"failed to connect to all addresses","file":"src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc","file_line":397,"grpc_status":14}]}" >
I'm trying to debug it myself, and using grpcurl as grpcurl -plaintext <host>:4000 list
and getting the following response:
Failed to list services: server does not support the reflection API
. To help me debug this, what are the symbols exposed on the grpc endpoint I could use to test it's functioning correctly?Remi Gabillet
09/03/2021, 2:30 PMError loading repository location glue.py:dagster.core.errors.DagsterInvariantViolationError: Attempted to iterate over an InvokedSolidOutputHandle. This object represents the output "result" from the solid "load_partitions". Consider yielding multiple Outputs if you seek to pass different parts of this output to different solids.
Darren Ng
09/03/2021, 2:35 PM@solid(
description="Read from google sheet for data pipeline.",
output_defs=[
OutputDefinition(name="row_data"),
OutputDefinition(name="gspread_object")
]
)
def read_from_gsheet() -> List:
print("Instantiate gspread sheet object and read a row..."
row = sheet.row_values(2)
yield Output(row, output_name="row_data")
yield Output(sheet, output_name="gspread_object")
@solid(description="Write to cells in Gsheet")
def write_to_gsheet(sheet, cell: str, value: int):
sheet.update(cell, value)
@solid(description="Read from google sheet and query data from NLP")
def query_NLP_data(context, row):
received_row = row
print("Sending request to query NLP graph and return metrics")
some_bol_metrics = 1000
<http://context.log.info|context.log.info>(f"Returned NLP metrics: {some_bol_metrics}")
return some_bol_metrics
@pipeline
def gather_pipeline():
row, sheet = read_from_gsheet()
some_bol_metrics = query_NLP_data(row)
write_to_gsheet(sheet, 'H2', some_bol_metrics)
In this example here I'm reading a row from a gspread sheet object, sending that to another solid which should query company db and return some metrics (here 1000) then pass that into another solid which writes to the same sheet object and write into 'H2'. But I'm getting
/home/dazza/anaconda3/envs/versedai/lib/python3.8/site-packages/dagster/core/definitions/composition.py:65: UserWarning: While in @pipeline context 'gather_pipeline', received an uninvoked solid 'write_to_gsheet'.
warnings.warn(warning_message.strip())
/home/dazza/anaconda3/envs/versedai/lib/python3.8/site-packages/dagster/core/workspace/context.py:504: UserWarning: Error loading repository location gather_pipeline.py:dagster.core.errors.DagsterInvalidDefinitionError: In @pipeline gather_pipeline, received invalid type <class 'str'> for input "cell" (at position 1) in solid invocation "write_to_gsheet". Must pass the output from previous solid invocations or inputs to the composition function as inputs when invoking solids during composition.
Any help or suggestions will be appreciated.doom leika
09/03/2021, 2:46 PMgRPC
repository instances that serves same codedoom leika
09/03/2021, 2:47 PMBrien Gleason
09/03/2021, 3:32 PMNothing
. I am having issues calling a solid in the pipeline, despite defining the InputDefinition for each upstream dependency. I get this error:
dagster.core.errors.DagsterInvalidDefinitionError: In @pipeline sample_example_pipeline, could not resolve input based on position at index 0 for invocation sample_example_table. Use keyword args instead, available inputs are: ['input_start1', 'input_start2']
Has anyone defined a solid that has multiple Nothing
inputs and could provide an example?Vlad Dumitrascu
09/03/2021, 10:24 PMdagster
and dagit
modules and pip installed dagster
and dagit
again. Last time, the dagit.exe
and other executables were put in the bin
folder of my --target=
directories (where I installed the modules). This time, there's no dagit.exe
executable. Where the hell do I find this thing? It's not in the documentation. I figured it out before, but I forgot where it was. The documentation only says to run 'dagit' but it's clearly not on my system path, so it can't find it. Where is it (by default)? This should be written better in the Docs, because it's a deal-breaker...doom leika
09/03/2021, 11:59 PMharitsahm
09/06/2021, 3:43 AMGeorge Pearse
09/06/2021, 12:18 PM