Skip to content
Draft
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions librarian.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,14 @@ libraries:
version: 1.0.0
copyright_year: "2025"
skip_release: true
keep:
- src/operation.rs
rust:
package_name_override: google-cloud-bigquery-v2
package_dependencies:
- name: google-cloud-lro
package: google-cloud-lro
force_used: true
- name: google-cloud-bigquery-write
copyright_year: "2026"
output: src/bigquery-write
Expand Down
1 change: 1 addition & 0 deletions src/generated/cloud/bigquery/v2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ serde_json.workspace = true
serde_with.workspace = true
tracing.workspace = true
wkt.workspace = true
google-cloud-lro.workspace = true

[dev-dependencies]
anyhow.workspace = true
3 changes: 3 additions & 0 deletions src/generated/cloud/bigquery/v2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ pub mod client;
/// Request builders.
pub mod builder;

#[allow(missing_docs)]
pub mod operation;
Comment thread
haphungw marked this conversation as resolved.
Outdated

#[doc(hidden)]
pub(crate) mod tracing;

Expand Down
154 changes: 154 additions & 0 deletions src/generated/cloud/bigquery/v2/src/operation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::builder::job_service::{GetQueryResults, InsertJob};
use crate::client::JobService;
use crate::model::{GetQueryResultsResponse, Job};
use google_cloud_lro::Poller;
use google_cloud_lro::internal::DiscoveryOperation;

impl DiscoveryOperation for Job {
fn done(&self) -> bool {
self.status
.as_ref()
.map(|s| s.state.as_str())
.unwrap_or_default()
== "DONE"
}

fn name(&self) -> Option<&String> {
self.job_reference.as_ref().map(|r| &r.job_id)
}
}

impl DiscoveryOperation for GetQueryResultsResponse {
fn done(&self) -> bool {
self.job_complete == Some(true)
}

fn name(&self) -> Option<&String> {
self.job_reference.as_ref().map(|r| &r.job_id)
}
}

pub trait InsertJobBuilderExt {
fn poller(
self,
client: &JobService,
project_id: impl Into<String>,
location: Option<String>,
) -> impl Poller<Job, Job>;
}
Comment thread
haphungw marked this conversation as resolved.

impl InsertJobBuilderExt for InsertJob {
fn poller(
self,
client: &JobService,
project_id: impl Into<String>,
location: Option<String>,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned on another comments, I think accepting those parameters here again sounds like duplication, since the get_job() call accepts a set_project_id and set_location, and maybe we can grab the JobService from inside InsertJob https://github.com/googleapis/google-cloud-rust/blob/main/src/generated/cloud/bigquery/v2/src/builder.rs#L937

) -> impl Poller<Job, Job> {
let client_clone = client.clone();
let project_id = project_id.into();

let start = move || {
let req = self;
async move { req.send().await }
};

let query = move |name: String| {
let client = client_clone.clone();
let project_id = project_id.clone();
let location = location.clone();
async move {
let mut b = client.get_job().set_project_id(project_id).set_job_id(name);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is using jobs.get to poll for all job types, but jobs.getQueryResults is much better for query jobs. So maybe we need to check if this is a query job and use a different method (or call the jobs.getQueryResults poller)

if let Some(loc) = location {
b = b.set_location(loc);
}
let mut options = google_cloud_gax::options::RequestOptions::default();
options.set_retry_policy(google_cloud_gax::retry_policy::NeverRetry);
b.with_options(options).send().await
}
};

let polling_error_policy =
std::sync::Arc::new(google_cloud_gax::polling_error_policy::Aip194Strict);
let polling_backoff_policy = std::sync::Arc::new(
google_cloud_gax::exponential_backoff::ExponentialBackoff::default(),
);

google_cloud_lro::internal::new_discovery_poller(
polling_error_policy,
polling_backoff_policy,
start,
query,
)
}
}

pub trait GetQueryResultsBuilderExt {
fn poller(
self,
client: &JobService,
project_id: impl Into<String>,
location: Option<String>,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned on another comments, I think accepting those parameters here again sounds like duplication, since the get_query_results() call accepts a set_project_id and set_location

) -> impl Poller<GetQueryResultsResponse, GetQueryResultsResponse>;
}
Comment thread
haphungw marked this conversation as resolved.
Outdated

impl GetQueryResultsBuilderExt for GetQueryResults {
fn poller(
self,
client: &JobService,
project_id: impl Into<String>,
location: Option<String>,
) -> impl Poller<GetQueryResultsResponse, GetQueryResultsResponse> {
let client_clone = client.clone();
let project_id = project_id.into();

let start = move || {
let req = self;
async move { req.send().await }
};

let query = move |name: String| {
let client = client_clone.clone();
let project_id = project_id.clone();
let location = location.clone();
async move {
let mut b = client
.get_query_results()
.set_project_id(project_id)
.set_job_id(name);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should pass maxResults=0 when using getQueryResults to wait for a query to finish. For large rows, it can add many seconds to the request time, even when no rows are actually returned.

if let Some(loc) = location {
b = b.set_location(loc);
}
let mut options = google_cloud_gax::options::RequestOptions::default();
options.set_retry_policy(google_cloud_gax::retry_policy::NeverRetry);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to have a way to pass a retry policy, because we can actually need to retry in some scenarios (see https://docs.cloud.google.com/bigquery/docs/error-messages) and even crazier, in some cases we only detect that the job has to be retried by pooling it. In some cases we need to re send the query (call jobs.query or jobs.insert again) with a different ID (see jobBackendError at https://docs.cloud.google.com/bigquery/docs/error-messages). We don't need to solve this right away, just pointing out that we need to take into account that we do need to customize retry policies here.

b.with_options(options).send().await
}
};

let polling_error_policy =
std::sync::Arc::new(google_cloud_gax::polling_error_policy::Aip194Strict);
let polling_backoff_policy = std::sync::Arc::new(
google_cloud_gax::exponential_backoff::ExponentialBackoff::default(),
);

google_cloud_lro::internal::new_discovery_poller(
polling_error_policy,
polling_backoff_policy,
start,
query,
)
}
}
Loading