Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions sonic-ycabled/tests/test_ycable.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,29 @@ def test_ycable_helper_async_client_run_loop_with_exception(self, sfputil):

class TestYcableActiveActiveHelper(object):

def test_get_soc_ip_for_grpc_prefers_ipv4(self):
soc_ip, soc_ip_field = get_soc_ip_for_grpc({
'soc_ipv4': '192.168.0.1/32',
'soc_ipv6': 'fc02:1000::1/128'
})

assert(soc_ip == '192.168.0.1')
assert(soc_ip_field == 'soc_ipv4')

def test_get_soc_ip_for_grpc_falls_back_to_ipv6(self):
soc_ip, soc_ip_field = get_soc_ip_for_grpc({
'soc_ipv6': 'fc02:1000::1/128'
})

assert(soc_ip == 'fc02:1000::1')
assert(soc_ip_field == 'soc_ipv6')

def test_format_grpc_target_for_ipv6(self):
assert(format_grpc_target('fc02:1000::1', GRPC_PORT) == '[fc02:1000::1]:50075')

def test_format_grpc_target_for_ipv4(self):
assert(format_grpc_target('192.168.0.1', GRPC_PORT) == '192.168.0.1:50075')

@patch("ycable.ycable.platform_sfputil")
def test_check_presence_for_active_active_cable_type(self, sfputil):

Expand Down
67 changes: 44 additions & 23 deletions sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,29 @@ def wrapper(*args, **kwargs):

return wrapper

def get_soc_ip_for_grpc(mux_table_dict):
for field in ["soc_ipv4", "soc_ipv6"]:
soc_ip_full = mux_table_dict.get(field, None)
if soc_ip_full is None:
continue

soc_ip = soc_ip_full.split('/')[0]
try:
ipaddress.ip_address(UNICODE_TYPE(soc_ip))
except ValueError:
helper_logger.log_warning("Invalid {} address {} for gRPC channel setup".format(field, soc_ip_full))
return None, field

return soc_ip, field

return None, None

def format_grpc_target(soc_ip, grpc_port):
ip_address = ipaddress.ip_address(UNICODE_TYPE(soc_ip))
if ip_address.version == 6:
return "[{}]:{}".format(soc_ip, grpc_port)
return "{}:{}".format(soc_ip, grpc_port)

def retry_setup_grpc_channel_for_port(port, asic_index, port_tbl, grpc_client):

global grpc_port_stubs
Expand All @@ -360,13 +383,10 @@ def retry_setup_grpc_channel_for_port(port, asic_index, port_tbl, grpc_client):
else:
# Convert list of tuples to a dictionary
mux_table_dict = dict(fvs)
if "state" in mux_table_dict and "soc_ipv4" in mux_table_dict:

soc_ipv4_full = mux_table_dict.get("soc_ipv4", None)
if soc_ipv4_full is not None:
soc_ipv4 = soc_ipv4_full.split('/')[0]
soc_ip, soc_ip_field = get_soc_ip_for_grpc(mux_table_dict)
if "state" in mux_table_dict and soc_ip is not None:

channel, stub = setup_grpc_channel_for_port(port, soc_ipv4, asic_index, grpc_client, False)
channel, stub = setup_grpc_channel_for_port(port, soc_ip, asic_index, grpc_client, False)
if channel is None or stub is None:
helper_logger.log_notice(
"stub is None, while reattempt setting up channels did not work {}".format(port))
Expand All @@ -375,6 +395,9 @@ def retry_setup_grpc_channel_for_port(port, asic_index, port_tbl, grpc_client):
grpc_port_channels[port] = channel
grpc_port_stubs[port] = stub
return True
else:
helper_logger.log_warning("No SoC IP address found for gRPC channel setup on port {}".format(port))
return False

def apply_grpc_secrets_configuration(SECRETS_PATH, grpc_config):

Expand Down Expand Up @@ -500,6 +523,7 @@ def wait_for_state_change(channel_connectivity, port):


def create_channel(type_chan, level, kvp, soc_ip, port, asic_index, is_async):
grpc_target = format_grpc_target(soc_ip, GRPC_PORT)

if type_chan == "secure":
credential = get_grpc_credentials(level, kvp)
Expand All @@ -511,20 +535,20 @@ def create_channel(type_chan, level, kvp, soc_ip, port, asic_index, is_async):
if is_async:
ASYNC_GRPC_CLIENT_OPTIONS = []
ASYNC_GRPC_CLIENT_OPTIONS.append(('grpc.ssl_target_name_override', '{}'.format(target_name)))
channel = grpc.aio.secure_channel("{}:{}".format(soc_ip, GRPC_PORT), credential, options=ASYNC_GRPC_CLIENT_OPTIONS)
channel = grpc.aio.secure_channel(grpc_target, credential, options=ASYNC_GRPC_CLIENT_OPTIONS)
stub = linkmgr_grpc_driver_pb2_grpc.DualToRActiveStub(channel)
else:
GRPC_CLIENT_OPTIONS.append(('grpc.ssl_target_name_override', '{}'.format(target_name)))
channel = grpc.secure_channel("{}:{}".format(soc_ip, GRPC_PORT), credential, options=GRPC_CLIENT_OPTIONS)
channel = grpc.secure_channel(grpc_target, credential, options=GRPC_CLIENT_OPTIONS)
stub = linkmgr_grpc_driver_pb2_grpc.DualToRActiveStub(channel)


else:
if is_async:
channel = grpc.aio.insecure_channel("{}:{}".format(soc_ip, GRPC_PORT))
channel = grpc.aio.insecure_channel(grpc_target)
stub = linkmgr_grpc_driver_pb2_grpc.DualToRActiveStub(channel)
else:
channel = grpc.insecure_channel("{}:{}".format(soc_ip, GRPC_PORT), options=GRPC_CLIENT_OPTIONS)
channel = grpc.insecure_channel(grpc_target, options=GRPC_CLIENT_OPTIONS)
stub = linkmgr_grpc_driver_pb2_grpc.DualToRActiveStub(channel)


Expand Down Expand Up @@ -669,12 +693,10 @@ def check_identifier_presence_and_setup_channel(logical_port_name, port_tbl, hw_
else:
# Convert list of tuples to a dictionary
mux_table_dict = dict(fvs)
if "state" in mux_table_dict and "soc_ipv4" in mux_table_dict:
soc_ip, soc_ip_field = get_soc_ip_for_grpc(mux_table_dict)
if "state" in mux_table_dict and soc_ip is not None:

val = mux_table_dict.get("state", None)
soc_ipv4_full = mux_table_dict.get("soc_ipv4", None)
if soc_ipv4_full is not None:
soc_ipv4 = soc_ipv4_full.split('/')[0]
cable_type = mux_table_dict.get("cable_type", None)

if val in CONFIG_MUX_STATES and cable_type == "active-active":
Expand All @@ -693,16 +715,16 @@ def check_identifier_presence_and_setup_channel(logical_port_name, port_tbl, hw_
if prev_channel is not None and prev_stub is not None:
return

channel, stub = setup_grpc_channel_for_port(logical_port_name, soc_ipv4, asic_index, grpc_client, False)
channel, stub = setup_grpc_channel_for_port(logical_port_name, soc_ip, asic_index, grpc_client, False)
post_port_mux_info_to_db(logical_port_name, mux_tbl, asic_index, hw_mux_cable_tbl, 'pseudo-cable')
if channel is not None:
grpc_port_channels[logical_port_name] = channel
helper_logger.log_notice(
"channel is not None, Cable-Insert or daemon init, daemon able to set up channel for gRPC SOC IP {}, port {}".format(soc_ipv4, logical_port_name))
"channel is not None, Cable-Insert or daemon init, daemon able to set up channel for gRPC SOC IP {}, port {}".format(soc_ip, logical_port_name))
if stub is not None:
grpc_port_stubs[logical_port_name] = stub
helper_logger.log_notice(
"stub is not None, Cable-Insert or daemon init, daemon able to set up channel for gRPC SOC IP {}, port {}".format(soc_ipv4, logical_port_name))
"stub is not None, Cable-Insert or daemon init, daemon able to set up channel for gRPC SOC IP {}, port {}".format(soc_ip, logical_port_name))

else:
helper_logger.log_warning(
Expand All @@ -716,6 +738,8 @@ def check_identifier_presence_and_setup_channel(logical_port_name, port_tbl, hw_
else:
helper_logger.log_warning(
"DAC cable logical to physical port mapping returned more than one physical ports while Channel setup Port {}".format(logical_port_name))
else:
helper_logger.log_warning("No SoC IP address found for gRPC channel setup on port {}".format(logical_port_name))


def setup_grpc_channels(stop_event, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, port_tbl, loopback_tbl, port_table_keys, grpc_client):
Expand Down Expand Up @@ -4117,13 +4141,10 @@ async def task_worker(self):
else:
# Convert list of tuples to a dictionary
mux_table_dict = dict(fvs)
if "state" in mux_table_dict and "soc_ipv4" in mux_table_dict:

soc_ipv4_full = mux_table_dict.get("soc_ipv4", None)
if soc_ipv4_full is not None:
soc_ipv4 = soc_ipv4_full.split('/')[0]
soc_ip, soc_ip_field = get_soc_ip_for_grpc(mux_table_dict)
if "state" in mux_table_dict and soc_ip is not None:

channel, stub = setup_grpc_channel_for_port(logical_port_name, soc_ipv4, asic_index, self.table_helper.get_grpc_config_tbl(), True)
channel, stub = setup_grpc_channel_for_port(logical_port_name, soc_ip, asic_index, self.table_helper.get_grpc_config_tbl(), True)

client = GracefulRestartClient(logical_port_name, channel, read_side)
tasks.append(asyncio.create_task(client.send_request_and_get_response()))
Expand Down
Loading