This repository contains examples of User Defined Functions for Confluent Cloud Flink in Java.
The examples also illustrate a realistic deployment workflow for a Flink SQL statement using a user defined function. Details are in the User Defined Functions Deployment Lifecycle page.
- Simple scalar function:
ConcatWithSeparator, usage -
Also shows multiple overloaded
eval()implementations. - Logging from UDF: LogOutOfRange, usage.
- Non-deterministic function: RandomString, usage.
- Extract JSON fields: JsonAddressToRow, usage - Extract specific fields from a string field containing JSON.
- Normalize JSON nested elements: NormalizeJsonArray, usage - Extract nested elements from a string containing JSON, emitting one row per element.
- Parse a structured JSON payload: ParseStructuredJson, usage - Parse a complex, structured JSON payload without a schema, execute programmatic validation, and emit a single ROW containing nested SQL elements such as ARRAYs and ROWs.
ℹ️Process Table Functions (PTF) are currently not yet publicly available in Confluent Cloud Flink.
- Entity State Machine: EntityStateMachine, usage.
This repository provides a POM file with all required dependencies and configurations to build these examples.
For an explanation about handling dependencies in your UDF projects, see Java dependencies in UDFs.
All user defined functions of this repo can be built using Maven and are packaged in a single JAR file.
Build the artifact:
mvn packageTo use the UDFs, you need to upload the artifact (the JAR file) first.
Go to Environments > select your environment > Artifacts.
Upload the artifact selecting Java as type of UDF, and being careful to select the same cloud provider and region where your Compute Pool and Cluster have been created.
⚠️ An uploaded artifact is immutable. If you modify the UDFs, you need to delete the old artifact first, or rename the new JAR file before uploading the new version. You will also have to drop and re-register the UDFs, pointing to the new artifact-id.
These examples use sample data from the marketplace catalog in the examples environment available in any Confluent
Cloud Flink organization.
Before using a UDF in SQL, you need to register it using a CREATE FUNCTION statement:
CREATE FUNCTION `<function-name>`
AS '<implementation-class-FQN>'
USING JAR 'confluent-artifact://<artifact-id>'For example:
CREATE FUNCTION `concat_with_separator`
AS 'io.confluent.flink.examples.udf.scalar.ConcatWithSeparator'
USING JAR 'confluent-artifact://cfa-abcd123'You can verify the function registration:
DESCRIBE FUNCTION EXTENDED `<function-name>`
⚠️ When you load a new artifact, unregister the UDF usingDROP FUNCTION <function-name>and re-register it.
Follow the additional instructions to test the UDF examples you find in this repository: