Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions packages/backend/src/apps/databricks/common/schema.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { z } from 'zod'

// Column names are wrapped in backticks in SQL statements, which escapes most
// special characters. We use a whitelist of alphanumerics, spaces, and common
// special characters — explicitly excluding backticks and backslashes, which
// would allow breaking out of the quoted identifier.
// Tested creating a column with: `a-zA-Z0-9 _-!@#$%^&*()+=[]{};:'",.<>/?|~]+`
export const columnNameSchema = z
.string()
.min(1, { message: 'Column name is required' })
.max(255, { message: 'Column name must be less than 255 characters' })
.regex(/^[a-zA-Z0-9 _\-!@#$%^&*()+=[\]{};:'",.<>/?|~]+$/, {
message: 'Column name contains invalid characters',
})

export const tableNameSchema = z
.string()
.min(1, { message: 'Table name is required' })
.max(255, { message: 'Table name must be less than 255 characters' })
.regex(/^[a-zA-Z0-9_]+$/, {
message: 'Table name contains invalid characters',
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { IDynamicAction, IGlobalVariable, IJSONObject } from '@plumber/types'

import { z } from 'zod'

import { BadUserInputError } from '@/errors/graphql-errors'
import logger from '@/helpers/logger'

import { createSession } from '../auth/create-client'
import { columnNameSchema, tableNameSchema } from '../common/schema'

const createTableSchema = z.object({
tableName: tableNameSchema,
columnName: columnNameSchema,
})

const dynamicData: IDynamicAction = {
name: 'Create Column',
key: 'databricks-createTableColumn',
type: 'action',
async run($: IGlobalVariable): Promise<IJSONObject> {
const parametersParseResult = createTableSchema.safeParse($.step.parameters)
if (parametersParseResult.success === false) {
throw new BadUserInputError(parametersParseResult.error.issues[0].message)
}

try {
const { tableName, columnName } = parametersParseResult.data

const { session, endSession } = await createSession($)
// Note: DDL statements like ALTER TABLE don't support parameterization in Databricks.
// Input validation via regex (only alphanumeric + underscore) provides SQL injection protection.
// We default to STRING type for the new column. Support for other types will be added later.
const statement = `ALTER TABLE \`${tableName}\` ADD COLUMN \`${columnName}\` STRING;`
const operation = await session.executeStatement(statement)
await operation.fetchAll()
await endSession()
return {
newValue: columnName,
}
} catch (e) {
logger.error({
event: 'databricks-dynamic-data-create-table-column',
error: e,
})
throw new Error('Failed to create column')
Comment thread
pregnantboy marked this conversation as resolved.
}
},
}

export default dynamicData
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { IDynamicAction, IGlobalVariable, IJSONObject } from '@plumber/types'

import validator from 'email-validator'
import { z } from 'zod'

import { BadUserInputError } from '@/errors/graphql-errors'
import logger from '@/helpers/logger'

import { createSession } from '../auth/create-client'
import { tableNameSchema } from '../common/schema'

const createTableSchema = z.object({
tableName: tableNameSchema,
userEmail: z.string().refine((val) => validator.validate(val), {
message: 'Invalid email address',
}),
})

const dynamicData: IDynamicAction = {
name: 'Create Table',
key: 'databricks-createTable',
type: 'action',
async run($: IGlobalVariable): Promise<IJSONObject> {
const parametersParseResult = createTableSchema.safeParse({
...$.step.parameters,
userEmail: $.user.email,
})
if (parametersParseResult.success === false) {
throw new BadUserInputError(parametersParseResult.error.issues[0].message)
}
try {
const { tableName } = parametersParseResult.data

const { session, endSession } = await createSession($)
// Note: DDL statements like CREATE TABLE don't support parameterization in Databricks.
// Input validation via regex (only alphanumeric + underscore) provides SQL injection protection.

const createTableOperation = await session.executeStatement(
`CREATE TABLE \`${tableName}\`;`,
)
await createTableOperation.fetchAll()
// We grant ALL PRIVILEGES to the current user to the table,
// which includes reading, writing, altering table, and managing permis.
const grantPermissionsOperation = await session.executeStatement(
`GRANT ALL PRIVILEGES ON \`${tableName}\` TO \`${$.user.email}\`;`,
)
await grantPermissionsOperation.fetchAll()
await endSession()
return {
newValue: tableName,
}
} catch (e) {
logger.error({
event: 'databricks-dynamic-data-create-table',
error: e,
})
throw new Error('Failed to create table')
}
},
}

export default dynamicData
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
export default []
import createColumn from './create-column'
import createTable from './create-table'
import listTableColumns from './list-table-columns'
import listTableNames from './list-table-names'

export default [listTableNames, listTableColumns, createTable, createColumn]
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import {
DynamicDataOutput,
IDynamicData,
IGlobalVariable,
} from '@plumber/types'

import { databricksConfig } from '@/config/app-env-vars/databricks'
import logger from '@/helpers/logger'

import { createSession } from '../auth/create-client'
import { constructSchemaName } from '../common/construct-schema-name'
import { DatabrickColumnRes } from '../common/types'

const dynamicData: IDynamicData = {
name: 'List Table Columns',
key: 'databricks-list-table-columns',

async run($: IGlobalVariable): Promise<DynamicDataOutput> {
try {
const tableName = $.step.parameters.tableName as string
if (!tableName) {
return {
data: [],
error: {
message: 'Table name is required',
},
}
}
const { session, endSession } = await createSession($)
const operation = await session.getColumns({
tableName: $.step.parameters.tableName as string,
catalogName: databricksConfig.catalog,
schemaName: constructSchemaName($),
})
const columns = (await operation.fetchAll({
maxRows: 1000,
})) as DatabrickColumnRes[]
await endSession()
return {
data: columns.map((column) => ({
name: column.COLUMN_NAME,
value: column.COLUMN_NAME,
})),
}
} catch (e) {
logger.error({
event: 'databricks-list-table-columns',
error: e,
})
return {
data: [],
error: {
message: 'Failed to list table columns',
},
}
}
},
}

export default dynamicData
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import {
DynamicDataOutput,
IDynamicData,
IGlobalVariable,
} from '@plumber/types'

import { databricksConfig } from '@/config/app-env-vars/databricks'
import logger from '@/helpers/logger'

import { createSession } from '../auth/create-client'
import { constructSchemaName } from '../common/construct-schema-name'
import { DatabrickTableRes } from '../common/types'

const dynamicData: IDynamicData = {
name: 'List Table Names',
key: 'databricks-list-table-names',

async run($: IGlobalVariable): Promise<DynamicDataOutput> {
try {
const { session, endSession } = await createSession($)
const operation = await session.getTables({
catalogName: databricksConfig.catalog,
schemaName: constructSchemaName($),
tableTypes: ['TABLE'],
})
const tables = (await operation.fetchAll({
maxRows: 1000,
})) as DatabrickTableRes[]
await endSession()
return {
data: tables.map((row) => ({
name: row.TABLE_NAME,
value: row.TABLE_NAME,
})),
}
} catch (e) {
logger.error({
event: 'databricks-list-table-names',
error: e,
})
return {
data: [],
error: {
message: 'Failed to list table names',
},
}
}
},
}

export default dynamicData
2 changes: 2 additions & 0 deletions packages/backend/src/graphql/mutation-resolvers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import deleteStep from './mutations/delete-step'
import deleteUploadedFile from './mutations/delete-uploaded-file'
import duplicateBranch from './mutations/duplicate-branch'
import duplicateFlow from './mutations/duplicate-flow'
import dynamicAction from './mutations/dynamic-action'
import executeStep from './mutations/execute-step'
import generateAuthUrl from './mutations/generate-auth-url'
import generatePresignedPost from './mutations/generate-presigned-post'
Expand Down Expand Up @@ -86,6 +87,7 @@ export default {
verifyOtp,
retryExecutionStep,
retryPartialStep,
dynamicAction,
logout,
loginWithSgid,
loginWithSelectedSgid,
Expand Down
64 changes: 64 additions & 0 deletions packages/backend/src/graphql/mutations/dynamic-action.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { IDynamicAction } from '@plumber/types'

import apps from '@/apps'
import { NotFoundError } from '@/errors/graphql-errors/not-found'
import globalVariable from '@/helpers/global-variable'

import type { MutationResolvers } from '../__generated__/types.generated'

const dynamicAction: MutationResolvers['dynamicAction'] = async (
_parent,
params,
context,
) => {
const { stepId, key: dynamicActionKey, parameters } = params.input

const step = await context.currentUser
.withAccessibleSteps({ requiredRole: 'editor' })
.withGraphFetched({
connection: true,
flow: {
user: true,
},
})
.findById(stepId)

if (!step || !step.appKey) {
throw new NotFoundError('Step not found')
}
Comment thread
pregnantboy marked this conversation as resolved.

const app = apps[step.appKey]
const connection = step.connection

// if app requires connection, only proceed if connection has been set up
if (app.auth && !connection) {
return null
}

const $ = await globalVariable({
connection,
app,
flow: step.flow,
step,
user: context.currentUser,
})

const command = app.dynamicData.find(
(data) => data.key === dynamicActionKey,
) as IDynamicAction | undefined

if (!command || command.type !== 'action') {
throw new Error(`Dynamic action ${dynamicActionKey} not found`)
}

for (const parameterKey in parameters) {
const parameterValue = parameters[parameterKey]
$.step.parameters[parameterKey] = parameterValue
}

const fetchedData = await command.run($)

return fetchedData
}

export default dynamicAction
11 changes: 10 additions & 1 deletion packages/backend/src/graphql/queries/get-dynamic-data.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import { IDynamicData } from '@plumber/types'

import apps from '@/apps'
import { BadUserInputError } from '@/errors/graphql-errors'
import { APP_CONNECTION_FIELDS } from '@/helpers/get-shared-connection-details'
import globalVariable from '@/helpers/global-variable'

Expand Down Expand Up @@ -41,7 +44,13 @@ const getDynamicData: QueryResolvers['getDynamicData'] = async (
user: step.flow.user,
})

const command = app.dynamicData.find((data) => data.key === dynamicDataKey)
const command = app.dynamicData.find(
(data) => data.key === dynamicDataKey,
) as IDynamicData | undefined

if (!command) {
throw new BadUserInputError(`Dynamic data ${dynamicDataKey} not found`)
}

for (const parameterKey in parameters) {
const parameterValue = parameters[parameterKey]
Expand Down
7 changes: 7 additions & 0 deletions packages/backend/src/graphql/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ type Mutation {
bulkRetryIterations(
input: BulkRetryIterationsInput
): BulkRetryIterationsResult!
dynamicAction(input: DynamicActionInput!): JSONObject!
logout: LogoutResult
loginWithSgid(input: OidcLoginInput!): LoginWithSgidResult!
loginWithSso(input: OidcLoginInput!): Boolean!
Expand Down Expand Up @@ -529,6 +530,12 @@ type BulkRetryIterationsResult {
allSuccessfullyRetried: Boolean!
}

input DynamicActionInput {
stepId: String!
key: String!
parameters: JSONObject!
}

input CreateConnectionInput {
key: String!
formattedData: JSONObject!
Expand Down
Loading
Loading