Python – Airflow DAG – How do I check the BQ first (remove if necessary) and then run the dataflow job?

Airflow DAG – How do I check the BQ first (remove if necessary) and then run the dataflow job?… here is a solution to the problem.

Airflow DAG – How do I check the BQ first (remove if necessary) and then run the dataflow job?

I’m using Cloud Composer to orchestrate ETL for files that arrive at GCS and go to BigQuery. I have a cloud function that triggers the DAG when the file arrives, and the cloud function passes the file name/location to the DAG. In my DAG, I have 2 tasks:

1) Use DataflowPythonOperator to run a data flow job that reads data from text in GCS and transforms and enters it into BQ, and 2) Move the file to the failure/success bucket based on whether the job failed or succeeded.
Each file has a file ID, which is a column in the bigquery table. Sometimes a file gets edited once or twice (which doesn’t happen very often) and I’d like to be able to delete the existing record for that file first.

I looked at other Airflow operators but want to perform 2 tasks in my DAG before running the dataflow job:

  1. Get the file ID based on the filename (now I have a bigquery table mapping filename-> file ID, but I can also just bring in a json as a mapping, I guess if that would be easier).
  2. If the file ID already exists in the bigquery table (the table that outputs the transformed data from the dataflow job), delete it and run the dataflow job so I have the latest information. I know one option is to just add a timestamp and only use the most recent record, but since there may be 1 million records per file and it’s not like I delete 100 files per day (maybe 1-2 tops), it looks like it might be confusing and confusing.

After the dataflow job, preferably before moving the files to the success/failure folder, I’d like to attach some “records” tables stating that this game was entered at this point. This would be how I see all the insertions that happen.
I’m trying to find different ways to do this, I’m new to Cloud Comperser, so after over 10 hours of research, it’s not clear to me how this will work, otherwise I’ll post the input code.

Thanks, I really appreciate everyone’s help, and I apologize if this isn’t as clear as you would like, the documentation about Airflow is very strong, but considering that cloud composer and bigquery are relatively new, it’s hard to thoroughly understand how to perform some GCP-specific tasks.

Solution

Sounds a bit complicated. Happily, almost all GCP services have carriers. Another thing is when the DAG execution is triggered. Did you figure it out? Every time a new file enters the GCS bucket, you want to trigger Google Cloud Functions to run.

  1. Trigger your DAG

To trigger a DAG, you need to use Google that relies on Object Finalize The cloud function calls it or Metadata Update trigger.

  1. Load data into BigQuery

If your file is already in GCS and is in JSON or CSV format, using a dataflow job is a bit excessive. You can use GoogleCloudStorageToBigQueryOperator loads files into BQ.

  1. Trace file ID

The best way to calculate the file ID may be to use the Bash or Python operators in Airflow. Can you deduce directly from the file name?

If so, then you can have a location at GoogleCloudStorageObjectSensor The upstream Python operator checks whether the file is in the success directory.

If so, then you can use BigQueryOperator runs delete queries on BQ.

After that, you run GoogleCloudStorageToBigQueryOperator.

  1. Move files around

If you are moving files from GCS to a GCS location, Google CloudStorageToGoogle CloudStorageOperator should do the tricks you need. If your BQ load operator fails, move to the failed file location, and if successful, to the successful job location.

  1. Record task logs

Maybe all you need to do to track the insert is to log the task information to GCS. See how to log task information to GCS

Does this help?

Related Problems and Solutions