DagsterDocs

Source code for dagster_gcp.dataproc.solids

from dagster import Bool, Field, solid
from dagster.seven import json

from .configs import define_dataproc_submit_job_config


[docs]@solid( required_resource_keys={"dataproc"}, config_schema={ "job_config": define_dataproc_submit_job_config(), "job_scoped_cluster": Field( Bool, description="whether to create a cluster or use an existing cluster", is_required=False, default_value=True, ), }, ) def dataproc_solid(context): job_config = context.solid_config["job_config"] context.log.info("submitting job with config: %s" % str(json.dumps(job_config))) if context.solid_config["job_scoped_cluster"]: # Cluster context manager, creates and then deletes cluster with context.resources.dataproc.cluster_context_manager() as cluster: # Submit the job specified by this solid to the cluster defined by the associated resource result = cluster.submit_job(job_config) job_id = result["reference"]["jobId"] context.log.info("Submitted job ID {}".format(job_id)) cluster.wait_for_job(job_id) else: # Submit to an existing cluster # Submit the job specified by this solid to the cluster defined by the associated resource result = context.resources.dataproc.submit_job(job_config) job_id = result["reference"]["jobId"] context.log.info("Submitted job ID {}".format(job_id)) context.resources.dataproc.wait_for_job(job_id)