Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,10 @@ where
return false;
};

match state.fetch_destination(&gateway_address.destination).await {
match state
.fetch_destination(&gateway_address.destination, None)
.await
{
Some(Address::Workload(wl)) => return predicate(wl.as_ref()),
Some(Address::Service(svc)) => {
for ep in svc.endpoints.iter() {
Expand Down
5 changes: 3 additions & 2 deletions src/proxy/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ impl Inbound {

// TODO Allow HBONE address to be a hostname. We have to respect rules about
// hostname scoping. Can we use the client's namespace here to do that?
let hbone_target = state.find_address(hbone_dst)?;
let hbone_target = state.find_address(hbone_dst, None)?;

// HBONE target can point to some service or workload. In either case, get the waypoint
let Some(target_waypoint) = (match hbone_target {
Expand All @@ -550,7 +550,8 @@ impl Inbound {
};

// Resolve the reference from our HBONE target
let Some(target_waypoint) = state.find_destination(&target_waypoint.destination) else {
let Some(target_waypoint) = state.find_destination(&target_waypoint.destination, None)
else {
return Some(false);
};

Expand Down
11 changes: 7 additions & 4 deletions src/proxy/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,10 +544,13 @@ impl OutboundConnection {
// If this is to-service traffic check for a service waypoint
// Capture result of whether this is svc addressed
let service = if let Some(Address::Service(target_service)) = state
.fetch_address(&NetworkAddress {
network: self.pi.cfg.network.clone(),
address: target.ip(),
})
.fetch_address(
&NetworkAddress {
network: self.pi.cfg.network.clone(),
address: target.ip(),
},
Some(&source_workload.namespace),
)
.await
{
// if we have a waypoint for this svc, use it; otherwise route traffic normally
Expand Down
57 changes: 38 additions & 19 deletions src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,20 +241,26 @@ impl ProxyState {
}

/// Find either a workload or service by the destination.
pub fn find_destination(&self, dest: &Destination) -> Option<Address> {
/// If `ns` is provided, prefer a service in that namespace when multiple share the same VIP.
pub fn find_destination(&self, dest: &Destination, ns: Option<&Strng>) -> Option<Address> {
match dest {
Destination::Address(addr) => self.find_address(addr),
Destination::Address(addr) => self.find_address(addr, ns),
Destination::Hostname(hostname) => self.find_hostname(hostname),
}
}

/// Find either a workload or a service by address.
pub fn find_address(&self, network_addr: &NetworkAddress) -> Option<Address> {
/// If `ns` is provided, prefer a service in that namespace when multiple share the same VIP.
pub fn find_address(
&self,
network_addr: &NetworkAddress,
ns: Option<&Strng>,
) -> Option<Address> {
// 1. handle workload ip, if workload not found fallback to service.
match self.workloads.find_address(network_addr) {
None => {
// 2. handle service
if let Some(svc) = self.services.get_by_vip(network_addr) {
if let Some(svc) = self.services.get_best_by_vip(network_addr, ns) {
return Some(Address::Service(svc));
}
None
Expand Down Expand Up @@ -300,10 +306,10 @@ impl ProxyState {
addr: SocketAddr,
resolution_mode: ServiceResolutionMode,
) -> Option<UpstreamDestination> {
if let Some(svc) = self
.services
.get_by_vip(&network_addr(network.clone(), addr.ip()))
{
if let Some(svc) = self.services.get_best_by_vip(
&network_addr(network.clone(), addr.ip()),
Some(&source_workload.namespace),
) {
if let Some(lb) = &svc.load_balancer
&& lb.mode == LoadBalancerMode::Passthrough
{
Expand Down Expand Up @@ -818,8 +824,11 @@ impl DemandProxyState {
addr: SocketAddr,
resolution_mode: ServiceResolutionMode,
) -> Result<Option<Upstream>, Error> {
self.fetch_address(&network_addr(network.clone(), addr.ip()))
.await;
self.fetch_address(
&network_addr(network.clone(), addr.ip()),
Some(&source_workload.namespace),
)
.await;
let upstream = {
self.read()
.find_upstream(network, source_workload, addr, resolution_mode)
Expand Down Expand Up @@ -1015,27 +1024,37 @@ impl DemandProxyState {

/// Looks for either a workload or service by the destination. If not found locally,
/// attempts to fetch on-demand.
pub async fn fetch_destination(&self, dest: &Destination) -> Option<Address> {
/// If `ns` is provided, prefer a service in that namespace when multiple share the same VIP.
pub async fn fetch_destination(
&self,
dest: &Destination,
ns: Option<&Strng>,
) -> Option<Address> {
match dest {
Destination::Address(addr) => self.fetch_address(addr).await,
Destination::Address(addr) => self.fetch_address(addr, ns).await,
Destination::Hostname(hostname) => self.fetch_hostname(hostname).await,
}
}

/// Looks for the given address to find either a workload or service by IP. If not found
/// locally, attempts to fetch on-demand.
pub async fn fetch_address(&self, network_addr: &NetworkAddress) -> Option<Address> {
/// If `ns` is provided, prefer a service in that namespace when multiple share the same VIP.
pub async fn fetch_address(
&self,
network_addr: &NetworkAddress,
ns: Option<&Strng>,
) -> Option<Address> {
// Wait for it on-demand, *if* needed
debug!(%network_addr.address, "fetch address");
if let Some(address) = self.read().find_address(network_addr) {
if let Some(address) = self.read().find_address(network_addr, ns) {
return Some(address);
}
if !self.supports_on_demand() {
return None;
}
// if both cache not found, start on demand fetch
self.fetch_on_demand(network_addr.to_string().into()).await;
self.read().find_address(network_addr)
self.read().find_address(network_addr, ns)
}

/// Looks for the given hostname to find either a workload or service by IP. If not found
Expand Down Expand Up @@ -1297,7 +1316,7 @@ mod tests {
});
test_helpers::assert_eventually(
Duration::from_secs(5),
|| mock_proxy_state.fetch_destination(&dst),
|| mock_proxy_state.fetch_destination(&dst, None),
Some(Address::Workload(Arc::new(
test_helpers::test_default_workload(),
))),
Expand All @@ -1311,7 +1330,7 @@ mod tests {
});
test_helpers::assert_eventually(
Duration::from_secs(5),
|| mock_proxy_state.fetch_destination(&dst),
|| mock_proxy_state.fetch_destination(&dst, None),
Some(Address::Service(Arc::new(
test_helpers::mock_default_service(),
))),
Expand All @@ -1325,7 +1344,7 @@ mod tests {
});
test_helpers::assert_eventually(
Duration::from_secs(5),
|| mock_proxy_state.fetch_destination(&dst),
|| mock_proxy_state.fetch_destination(&dst, None),
None,
)
.await;
Expand All @@ -1337,7 +1356,7 @@ mod tests {
});
test_helpers::assert_eventually(
Duration::from_secs(5),
|| mock_proxy_state.fetch_destination(&dst),
|| mock_proxy_state.fetch_destination(&dst, None),
None,
)
.await;
Expand Down
165 changes: 159 additions & 6 deletions src/state/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,8 @@ pub struct ServiceStore {
pub(super) staged_services: HashMap<NamespacedHostname, HashMap<Strng, Endpoint>>,

/// Allows for lookup of services by network address, the service's xds secondary key.
pub(super) by_vip: HashMap<NetworkAddress, Arc<Service>>,
/// Multiple services in different namespaces may share the same VIP.
pub(super) by_vip: HashMap<NetworkAddress, Vec<Arc<Service>>>,

/// Allows for lookup of services by hostname, and then by namespace. XDS uses a combination
/// of hostname and namespace as the primary key. In most cases, there will be a single
Expand All @@ -355,9 +356,22 @@ pub struct ServiceStore {
}

impl ServiceStore {
/// Returns the [Service] matching the given VIP.
pub fn get_by_vip(&self, vip: &NetworkAddress) -> Option<Arc<Service>> {
self.by_vip.get(vip).cloned()
/// Returns the list of [Service]s matching the given VIP. Multiple services in
/// different namespaces may share the same VIP.
pub fn get_by_vip(&self, vip: &NetworkAddress) -> Option<Vec<Arc<Service>>> {
self.by_vip.get(vip).map(|v| v.to_vec())
}

/// Returns the "best" [Service] matching the given VIP.
/// If a namespace is provided, a Service from that namespace is preferred.
/// Next, a Service marked `canonical` is preferred.
pub fn get_best_by_vip(
&self,
vip: &NetworkAddress,
ns: Option<&Strng>,
) -> Option<Arc<Service>> {
let services = self.get_by_vip(vip)?;
Some(ServiceMatch::find_best_match(services.iter(), ns, None)?.clone())
}

/// Returns the list of [Service]s matching the given hostname. Istio `ServiceEntry`
Expand Down Expand Up @@ -509,7 +523,21 @@ impl ServiceStore {

// Map the vips to the service.
for vip in &service.vips {
self.by_vip.insert(vip.clone(), service.clone());
match self.by_vip.get_mut(vip) {
None => {
self.by_vip.insert(vip.clone(), vec![service.clone()]);
}
Some(services) => {
if let Some((cur, _)) = services
.iter()
.find_position(|s| s.namespace == service.namespace)
{
services[cur] = service.clone();
} else {
services.push(service.clone());
}
}
}
}

// Map the hostname to the service.
Expand Down Expand Up @@ -560,7 +588,12 @@ impl ServiceStore {

// Remove the entries for the previous service VIPs.
prev.vips.iter().for_each(|addr| {
self.by_vip.remove(addr);
if let Some(vip_services) = self.by_vip.get_mut(addr) {
vip_services.retain(|s| s.namespace != prev.namespace);
if vip_services.is_empty() {
self.by_vip.remove(addr);
}
}
});

// Remove the staged service.
Expand Down Expand Up @@ -653,3 +686,123 @@ impl<'a> ServiceMatch<'a> {
.into()
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::net::Ipv4Addr;

fn nw(ip: IpAddr) -> NetworkAddress {
NetworkAddress {
address: ip,
network: crate::strng::EMPTY,
}
}

fn make_service(name: &str, ns: &str, vips: Vec<IpAddr>) -> Service {
Service {
name: name.into(),
namespace: ns.into(),
hostname: format!("{name}.{ns}.svc.cluster.local").into(),
vips: vips.into_iter().map(nw).collect(),
ports: HashMap::new(),
endpoints: EndpointSet::from_list([]),
subject_alt_names: vec![],
waypoint: None,
load_balancer: None,
ip_families: None,
canonical: false,
}
}

#[test]
fn shared_vip_different_namespaces() {
let mut store = ServiceStore::default();
// shared: both services claim this VIP
let shared = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1));
// exclusive to each service
let only_a = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2));
let only_b = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 3));

let ns_a: Strng = "ns-a".into();
let ns_b: Strng = "ns-b".into();

let svc_a = make_service("svc", "ns-a", vec![shared, only_a]);
let svc_b = make_service("svc", "ns-b", vec![shared, only_b]);

store.insert(svc_a);
store.insert(svc_b);

// 3 unique VIP addresses, 2 services.
assert_eq!(store.num_vips(), 3);
assert_eq!(store.num_services(), 2);

// Shared VIP has both services.
let all = store.get_by_vip(&nw(shared)).unwrap();
assert_eq!(all.len(), 2);

// Exclusive VIPs have one service each.
assert_eq!(store.get_by_vip(&nw(only_a)).unwrap().len(), 1);
assert_eq!(store.get_by_vip(&nw(only_b)).unwrap().len(), 1);

// Namespace preference on shared VIP returns the correct service.
assert_eq!(
store
.get_best_by_vip(&nw(shared), Some(&ns_a))
.unwrap()
.namespace,
ns_a,
);
assert_eq!(
store
.get_best_by_vip(&nw(shared), Some(&ns_b))
.unwrap()
.namespace,
ns_b,
);

// Exclusive VIPs always return their owner regardless of namespace hint.
assert_eq!(
store
.get_best_by_vip(&nw(only_a), Some(&ns_b))
.unwrap()
.namespace,
ns_a,
);
assert_eq!(
store
.get_best_by_vip(&nw(only_b), Some(&ns_a))
.unwrap()
.namespace,
ns_b,
);

// None returns some service.
assert!(store.get_best_by_vip(&nw(shared), None).is_some());

// Remove svc_a: shared VIP keeps svc_b, only_a is gone, only_b remains.
store.remove(&NamespacedHostname {
namespace: "ns-a".into(),
hostname: "svc.ns-a.svc.cluster.local".into(),
});
assert_eq!(store.num_vips(), 2); // shared + only_b
assert_eq!(store.num_services(), 1);

let all = store.get_by_vip(&nw(shared)).unwrap();
assert_eq!(all.len(), 1);
assert_eq!(all[0].namespace, ns_b);

assert!(store.get_by_vip(&nw(only_a)).is_none());
assert!(store.get_by_vip(&nw(only_b)).is_some());

// Remove svc_b: everything is gone.
store.remove(&NamespacedHostname {
namespace: "ns-b".into(),
hostname: "svc.ns-b.svc.cluster.local".into(),
});
assert_eq!(store.num_vips(), 0);
assert_eq!(store.num_services(), 0);
assert!(store.get_by_vip(&nw(shared)).is_none());
assert!(store.get_by_vip(&nw(only_b)).is_none());
}
}
Loading