Skip to content
Draft
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions librarian.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -473,8 +473,14 @@ libraries:
- name: google-cloud-bigquery-v2
version: 1.0.0
copyright_year: "2025"
keep:
- src/operation.rs
skip_release: true
rust:
package_dependencies:
- name: google-cloud-lro
package: google-cloud-lro
force_used: true
package_name_override: google-cloud-bigquery-v2
- name: google-cloud-bigquery-write
copyright_year: "2026"
Expand Down
1 change: 1 addition & 0 deletions src/generated/cloud/bigquery/v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ serde_json.workspace = true
serde_with.workspace = true
tracing.workspace = true
wkt.workspace = true
google-cloud-lro.workspace = true

[dev-dependencies]
anyhow.workspace = true
3 changes: 3 additions & 0 deletions src/generated/cloud/bigquery/v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ pub mod client;
/// Request builders.
pub mod builder;

#[allow(missing_docs)]
pub mod operation;
Comment thread
haphungw marked this conversation as resolved.
Outdated

#[doc(hidden)]
pub(crate) mod tracing;

Expand Down
213 changes: 213 additions & 0 deletions src/generated/cloud/bigquery/v2/src/operation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::builder::job_service::{GetQueryResults, InsertJob};
use crate::client::JobService;
use crate::model::{GetQueryResultsResponse, Job};
use google_cloud_lro::Poller;
use google_cloud_lro::internal::DiscoveryOperation;

impl DiscoveryOperation for Job {
fn done(&self) -> bool {
self.status
.as_ref()
.map(|s| s.state.as_str())
.unwrap_or_default()
== "DONE"
}

fn name(&self) -> Option<&String> {
self.job_reference.as_ref().map(|r| &r.job_id)
}
}

impl DiscoveryOperation for GetQueryResultsResponse {
fn done(&self) -> bool {
self.job_complete == Some(true)
}

fn name(&self) -> Option<&String> {
self.job_reference.as_ref().map(|r| &r.job_id)
}
}

/// Extension trait for [`InsertJob`] to support Long-Running Operation (LRO) polling.
///
/// # Example
/// ```no_run
/// use google_cloud_bigquery_v2::client::JobService;
/// use google_cloud_bigquery_v2::model::{Job, JobConfiguration, JobConfigurationQuery};
/// use google_cloud_bigquery_v2::operation::InsertJobBuilderExt;
/// use google_cloud_lro::Poller;
///
/// async fn example(client: JobService, project_id: &str) -> Result<(), google_cloud_gax::error::Error> {
/// let mut poller = client
/// .insert_job()
/// .set_project_id(project_id)
/// .set_job(
/// Job::new().set_configuration(
/// JobConfiguration::new().set_query(
/// JobConfigurationQuery::new().set_query("SELECT 1")
/// )
/// )
/// )
/// .poller(&client, project_id, None);
///
/// // Wait for the job to complete
/// let job = poller.until_done().await?;
///
/// // Check for BigQuery specific errors inside the payload
/// if let Some(status) = job.status {
/// if let Some(err) = status.error_result {
/// println!("Job failed with: {}", err.message);
/// }
/// }
/// Ok(())
/// }
/// ```
pub trait InsertJobBuilderExt {
/// Returns a poller to monitor the status of the inserted job.
fn poller(
self,
client: &JobService,
project_id: impl Into<String>,
location: Option<String>,
) -> impl Poller<Job, Job>;
}
Comment thread
haphungw marked this conversation as resolved.

impl InsertJobBuilderExt for InsertJob {
fn poller(
self,
client: &JobService,
project_id: impl Into<String>,
location: Option<String>,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned on another comments, I think accepting those parameters here again sounds like duplication, since the get_job() call accepts a set_project_id and set_location, and maybe we can grab the JobService from inside InsertJob https://github.com/googleapis/google-cloud-rust/blob/main/src/generated/cloud/bigquery/v2/src/builder.rs#L937

) -> impl Poller<Job, Job> {
let client_clone = client.clone();
let project_id = project_id.into();

let start = move || {
let req = self;
async move { req.send().await }
};

let query = move |name: String| {
let client = client_clone.clone();
let project_id = project_id.clone();
let location = location.clone();
async move {
let mut b = client.get_job().set_project_id(project_id).set_job_id(name);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is using jobs.get to poll for all job types, but jobs.getQueryResults is much better for query jobs. So maybe we need to check if this is a query job and use a different method (or call the jobs.getQueryResults poller)

if let Some(loc) = location {
b = b.set_location(loc);
}
let mut options = google_cloud_gax::options::RequestOptions::default();
options.set_retry_policy(google_cloud_gax::retry_policy::NeverRetry);
b.with_options(options).send().await
}
};

let polling_error_policy =
std::sync::Arc::new(google_cloud_gax::polling_error_policy::Aip194Strict);
let polling_backoff_policy = std::sync::Arc::new(
google_cloud_gax::exponential_backoff::ExponentialBackoff::default(),
);

google_cloud_lro::internal::new_discovery_poller(
polling_error_policy,
polling_backoff_policy,
start,
query,
)
}
}

/// Extension trait for [`GetQueryResults`] to support Long-Running Operation (LRO) polling.
///
/// # Example
/// ```no_run
/// use google_cloud_bigquery_v2::client::JobService;
/// use google_cloud_bigquery_v2::operation::GetQueryResultsBuilderExt;
/// use google_cloud_lro::Poller;
///
/// async fn example(client: JobService, project_id: &str, job_id: &str) -> Result<(), google_cloud_gax::error::Error> {
/// let mut poller = client
/// .get_query_results()
/// .set_project_id(project_id)
/// .set_job_id(job_id)
/// .poller(&client, project_id, None);
///
/// let response = poller.until_done().await?;
///
/// if response.job_complete.unwrap_or(false) {
/// println!("Query finished! Received {} rows", response.total_rows.unwrap_or_default());
/// }
/// Ok(())
/// }
/// ```
pub trait GetQueryResultsBuilderExt {
/// Returns a poller to monitor the status of the query results.
fn poller(
self,
client: &JobService,
project_id: impl Into<String>,
location: Option<String>,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned on another comments, I think accepting those parameters here again sounds like duplication, since the get_query_results() call accepts a set_project_id and set_location

) -> impl Poller<GetQueryResultsResponse, GetQueryResultsResponse>;
}

impl GetQueryResultsBuilderExt for GetQueryResults {
fn poller(
self,
client: &JobService,
project_id: impl Into<String>,
location: Option<String>,
) -> impl Poller<GetQueryResultsResponse, GetQueryResultsResponse> {
let client_clone = client.clone();
let project_id = project_id.into();

let start = move || {
let req = self;
async move { req.send().await }
};

let query = move |name: String| {
let client = client_clone.clone();
let project_id = project_id.clone();
let location = location.clone();
async move {
let mut b = client
.get_query_results()
.set_project_id(project_id)
.set_job_id(name);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should pass maxResults=0 when using getQueryResults to wait for a query to finish. For large rows, it can add many seconds to the request time, even when no rows are actually returned.

if let Some(loc) = location {
b = b.set_location(loc);
}
let mut options = google_cloud_gax::options::RequestOptions::default();
options.set_retry_policy(google_cloud_gax::retry_policy::NeverRetry);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to have a way to pass a retry policy, because we can actually need to retry in some scenarios (see https://docs.cloud.google.com/bigquery/docs/error-messages) and even crazier, in some cases we only detect that the job has to be retried by pooling it. In some cases we need to re send the query (call jobs.query or jobs.insert again) with a different ID (see jobBackendError at https://docs.cloud.google.com/bigquery/docs/error-messages). We don't need to solve this right away, just pointing out that we need to take into account that we do need to customize retry policies here.

b.with_options(options).send().await
}
};

let polling_error_policy =
std::sync::Arc::new(google_cloud_gax::polling_error_policy::Aip194Strict);
let polling_backoff_policy = std::sync::Arc::new(
google_cloud_gax::exponential_backoff::ExponentialBackoff::default(),
);

google_cloud_lro::internal::new_discovery_poller(
polling_error_policy,
polling_backoff_policy,
start,
query,
)
}
}
1 change: 1 addition & 0 deletions tests/bigquery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ anyhow.workspace = true
futures.workspace = true
google-cloud-bigquery-v2 = { workspace = true, features = ["default"] }
google-cloud-gax = { workspace = true, features = ["unstable-stream"] }
google-cloud-lro = { workspace = true }
google-cloud-test-utils = { workspace = true }
rand.workspace = true
tokio.workspace = true
69 changes: 65 additions & 4 deletions tests/bigquery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use google_cloud_bigquery_v2::client::{DatasetService, JobService};
use google_cloud_bigquery_v2::model::{
Dataset, DatasetReference, Job, JobConfiguration, JobConfigurationQuery, JobReference,
};
use google_cloud_bigquery_v2::operation::{GetQueryResultsBuilderExt, InsertJobBuilderExt};
use google_cloud_gax::{error::rpc::Code, paginator::ItemPaginator};
use google_cloud_lro::Poller;
use google_cloud_test_utils::runtime_config::project_id;
use rand::{RngExt, distr::Alphanumeric};

Expand Down Expand Up @@ -184,7 +186,7 @@ pub async fn job_service() -> Result<()> {
println!("CREATING JOB WITH ID: {job_id}");

let query = "SELECT 1 as one";
let job = client
let poller = client
.insert_job()
.set_project_id(&project_id)
.set_job(
Expand All @@ -196,12 +198,24 @@ pub async fn job_service() -> Result<()> {
.set_query(JobConfigurationQuery::new().set_query(query)),
),
)
.send()
.await?;
println!("CREATE JOB = {job:?}");
.poller(&client, &project_id, None);

let job = poller.until_done().await?;
println!("CREATE JOB (POLLED) = {job:?}");

assert!(job.job_reference.is_some(), "{job:?}");

// Also test polling for query results
let results_poller = client
.get_query_results()
.set_project_id(&project_id)
.set_job_id(&job_id)
.poller(&client, &project_id, None);

let results = results_poller.until_done().await?;
println!("QUERY RESULTS (POLLED) = {results:?}");
assert_eq!(results.job_complete, Some(true));

let list = client
.list_jobs()
.set_project_id(&project_id)
Expand All @@ -216,6 +230,53 @@ pub async fn job_service() -> Result<()> {
.any(|v| v.as_ref().unwrap().id.contains(&job_id))
);

// EDGE CASE 1: Deliberately failing job (e.g. syntax error or missing table)
let failing_query = "SELECT * FROM dataset_that_does_not_exist.table_that_does_not_exist";
let failing_job_id = random_job_id();
let failing_poller = client
.insert_job()
.set_project_id(&project_id)
.set_job(
Job::new()
.set_job_reference(JobReference::new().set_job_id(&failing_job_id))
.set_configuration(
JobConfiguration::new()
.set_labels([(INSTANCE_LABEL, "true")])
.set_query(JobConfigurationQuery::new().set_query(failing_query)),
),
)
.poller(&client, &project_id, None);

// The poller itself should succeed (because the HTTP polling worked and the job reached DONE state)
let failed_job = failing_poller.until_done().await?;
println!("FAILING JOB (POLLED) = {failed_job:?}");

// But the job payload must contain an error_result!
let status = failed_job.status.expect("Job should have a status");
assert_eq!(status.state, "DONE");
assert!(
status.error_result.is_some(),
"Job should have an error_result payload"
);

// EDGE CASE 2: Polling an invalid/non-existent job
// According to the Aip194Strict policy, a 404 is NOT transient.
// The poller should immediately return the 404 error instead of looping forever.
let invalid_job_id = "job_that_definitely_does_not_exist_123456789";
let invalid_poller = client
.get_query_results()
.set_project_id(&project_id)
.set_job_id(invalid_job_id)
.poller(&client, &project_id, None);

let result = invalid_poller.until_done().await;
match result {
Ok(_) => panic!("Expected polling a non-existent job to fail"),
Err(e) => {
println!("INVALID JOB ERR = {e:?}");
}
}

Ok(())
}

Expand Down
Loading