Skip to content

Commit dd9411c

Browse files
committed
fix: COPY FROM STDIN without header now inserts rows correctly
- bulk_executor.get_table_columns: bypass catalog router by calling _execute_external_async directly — the catalog router intercepts INFORMATION_SCHEMA.COLUMNS and returns empty rows as a pg_catalog emulation fallback, causing INSERT with empty column list - bulk_insert: lazily detect CSV placeholder keys (column_0, column_1…) and remap to real schema columns only when needed, leaving header-derived or explicit column lists untouched - tests/conftest.py: increase pool to 30 / timeout 15s to prevent exhaustion when COPY tests hold transaction connections during 250-row bulk inserts
1 parent 93eb078 commit dd9411c

File tree

2 files changed

+30
-13
lines changed

2 files changed

+30
-13
lines changed

src/iris_pgwire/bulk_executor.py

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,23 @@ async def bulk_insert(
7171
total_rows = 0
7272
batch = []
7373
actual_column_names = column_names
74+
# Lazily resolved from schema when CSV has no header and no column list
75+
schema_columns: list[str] | None = None
7476

7577
async for row_dict in rows:
76-
# Determine column names from first row if not specified
77-
if actual_column_names is None:
78+
# When column_names is None and the CSV has no header, the CSV processor
79+
# generates placeholder keys (column_0, column_1, …). Detect this on
80+
# the first row, fetch real column names from IRIS schema, and re-map.
81+
first_key = next(iter(row_dict), None)
82+
if first_key is not None and first_key.startswith("column_"):
83+
if schema_columns is None:
84+
schema_columns = await self.get_table_columns(table_name)
85+
logger.debug(f"Columns fetched from schema for {table_name}: {schema_columns}")
86+
if schema_columns:
87+
actual_column_names = schema_columns
88+
values = list(row_dict.values())
89+
row_dict = dict(zip(schema_columns, values))
90+
elif actual_column_names is None:
7891
actual_column_names = list(row_dict.keys())
7992
logger.debug(f"Columns inferred from data: {actual_column_names}")
8093

@@ -321,25 +334,29 @@ async def get_table_columns(self, table_name: str) -> list[str]:
321334
"""
322335
Get column names for a table using INFORMATION_SCHEMA.
323336
337+
Bypasses the catalog router (which intercepts INFORMATION_SCHEMA queries
338+
for pg_catalog emulation) by calling _execute_external_async directly.
339+
324340
Args:
325341
table_name: Table name
326342
327343
Returns:
328-
List of column names
344+
List of column names in ordinal order
329345
330346
Raises:
331347
Exception: IRIS query error
332348
"""
333-
query = f"""
334-
SELECT column_name
335-
FROM INFORMATION_SCHEMA.COLUMNS
336-
WHERE LOWER(table_name) = LOWER('{table_name}')
337-
ORDER BY ordinal_position
338-
"""
349+
query = (
350+
"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS "
351+
f"WHERE LOWER(TABLE_NAME) = LOWER('{table_name}') "
352+
"ORDER BY ORDINAL_POSITION"
353+
)
339354

340-
result = await self.iris_executor.execute_query(query, [])
355+
# Bypass iris_executor.execute_query (which routes through the catalog
356+
# router and returns empty rows for INFORMATION_SCHEMA queries) by
357+
# calling the external execution path directly.
358+
result = await self.iris_executor._execute_external_async(query, [])
341359

342-
# Extract column names from result
343360
columns = []
344361
if result and "rows" in result:
345362
columns = [row[0] for row in result["rows"]]

tests/conftest.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -679,8 +679,8 @@ def pgwire_server(
679679
iris_password=iris_config["password"],
680680
iris_namespace=pgwire_namespace,
681681
enable_ssl=False,
682-
connection_pool_size=20,
683-
connection_pool_timeout=10.0,
682+
connection_pool_size=30,
683+
connection_pool_timeout=15.0,
684684
)
685685

686686
loop = asyncio.new_event_loop()

0 commit comments

Comments
 (0)