sr

package module
v1.5.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 25, 2025 License: BSD-3-Clause Imports: 18 Imported by: 41

Documentation

Overview

Package sr provides a schema registry client and a helper type to encode values and decode data according to the schema registry wire format.

As mentioned on the Serde type, this package does not provide schema auto-discovery and type auto-decoding. To aid in strong typing and validated encoding/decoding, you must register IDs and values to how to encode or decode them.

The client does not automatically cache schemas, instead, the Serde type is used for the actual caching of IDs to how to encode/decode the IDs. The Client type itself simply speaks http to your schema registry and returns the results.

To read more about the schema registry, see the following:

https://docs.confluent.io/platform/current/schema-registry/develop/api.html

Index

Constants

View Source
const (
	// LogLevelNone corresponds to no logging.
	LogLevelNone int8 = iota
	// LogLevelError opts into logging errors.
	LogLevelError
	// LogLevelWarn opts into logging warnings and errors.
	LogLevelWarn
	// LogLevelInfo opts into informational logging, warnings, and errors.
	LogLevelInfo
	// LogLevelDebug opts into all logs.
	LogLevelDebug
)

The log levels that can be used to control the verbosity of the SR client. The levels are ordered to mirror the kgo leveling.

View Source
const GlobalSubject = ""

GlobalSubject is a constant to make API usage of requesting global subjects clearer.

Variables

View Source
var (
	// ErrUnknown represents an unknown Schema Registry error code.
	// This is returned when the Schema Registry returns an error code that is not
	// recognized by this client version.
	ErrUnknown = &Error{-1, "UNKNOWN_SCHEMA_REGISTRY_ERROR", "Unknown Schema Registry error"}

	// ErrSubjectNotFound indicates that the specified subject does not exist.
	ErrSubjectNotFound = &Error{40401, "SUBJECT_NOT_FOUND", "Subject does not exist"}

	// ErrVersionNotFound indicates that the specified version does not exist for the subject.
	ErrVersionNotFound = &Error{40402, "VERSION_NOT_FOUND", "Version does not exist for subject"}

	// ErrSchemaNotFound indicates that the specified schema does not exist.
	ErrSchemaNotFound = &Error{40403, "SCHEMA_NOT_FOUND", "Schema does not exist"}

	// ErrSubjectSoftDeleted indicates that the subject was soft deleted.
	// Set permanent=true to delete permanently.
	ErrSubjectSoftDeleted = &Error{40404, "SUBJECT_SOFT_DELETED", "Subject was soft deleted"}

	// ErrSubjectNotSoftDeleted indicates that the subject was not deleted first
	// before being permanently deleted.
	ErrSubjectNotSoftDeleted = &Error{40405, "SUBJECT_NOT_SOFT_DELETED", "Subject was not deleted first before being permanently deleted"}

	// ErrSchemaVersionSoftDeleted indicates that the specific version of the subject
	// was soft deleted. Set permanent=true to delete permanently.
	ErrSchemaVersionSoftDeleted = &Error{40406, "SCHEMA_VERSION_SOFT_DELETED", "Schema version was soft deleted"}

	// ErrSchemaVersionNotSoftDeleted indicates that the specific version of the subject
	// was not deleted first before being permanently deleted.
	ErrSchemaVersionNotSoftDeleted = &Error{40407, "SCHEMA_VERSION_NOT_SOFT_DELETED", "Schema version was not deleted first before being permanently deleted"}

	// ErrSubjectLevelCompatibilityNotConfigured indicates that the subject does not have
	// subject-level compatibility configured.
	ErrSubjectLevelCompatibilityNotConfigured = &Error{40408, "SUBJECT_LEVEL_COMPATIBILITY_NOT_CONFIGURED", "Subject does not have subject-level compatibility configured"}

	// ErrSubjectLevelModeNotConfigured indicates that the subject does not have
	// subject-level mode configured.
	ErrSubjectLevelModeNotConfigured = &Error{40409, "SUBJECT_LEVEL_MODE_NOT_CONFIGURED", "Subject does not have subject-level mode configured"}

	// ErrIncompatibleSchema indicates that the schema being registered is incompatible
	// with an earlier schema for the same subject, according to the configured compatibility level.
	ErrIncompatibleSchema = &Error{409, "INCOMPATIBLE_SCHEMA", "Schema is incompatible with an earlier schema"}

	// ErrInvalidSchema indicates that the provided schema is invalid.
	ErrInvalidSchema = &Error{42201, "INVALID_SCHEMA", "Schema is invalid"}

	// ErrInvalidVersion indicates that the provided version is invalid.
	ErrInvalidVersion = &Error{42202, "INVALID_VERSION", "Version is invalid"}

	// ErrInvalidCompatibilityLevel indicates that the provided compatibility level is invalid.
	ErrInvalidCompatibilityLevel = &Error{42203, "INVALID_COMPATIBILITY_LEVEL", "Compatibility level is invalid"}

	// ErrInvalidMode indicates that the provided mode is invalid.
	ErrInvalidMode = &Error{42204, "INVALID_MODE", "Mode is invalid"}

	// ErrOperationNotPermitted indicates that the requested operation is not permitted.
	ErrOperationNotPermitted = &Error{42205, "OPERATION_NOT_PERMITTED", "Operation is not permitted"}

	// ErrReferenceExists indicates that the schema reference already exists.
	ErrReferenceExists = &Error{42206, "REFERENCE_EXISTS", "Schema reference already exists"}

	// ErrIDDoesNotMatch indicates that the provided ID does not match the expected ID.
	ErrIDDoesNotMatch = &Error{42207, "ID_DOES_NOT_MATCH", "Provided ID does not match expected ID"}

	// ErrInvalidSubject indicates that the provided subject name is invalid.
	ErrInvalidSubject = &Error{42208, "INVALID_SUBJECT", "Subject name is invalid"}

	// ErrSchemaTooLarge indicates that the schema is too large.
	ErrSchemaTooLarge = &Error{42209, "SCHEMA_TOO_LARGE", "Schema is too large"}

	// ErrInvalidRuleset indicates that the provided ruleset is invalid.
	ErrInvalidRuleset = &Error{42210, "INVALID_RULESET", "Ruleset is invalid"}

	// ErrContextNotEmpty indicates that the context is not empty when it should be.
	ErrContextNotEmpty = &Error{42211, "CONTEXT_NOT_EMPTY", "Context is not empty when it should be"}

	// ErrStoreError indicates an error in the backend datastore.
	ErrStoreError = &Error{50001, "STORE_ERROR", "Error in backend datastore"}

	// ErrOperationTimeout indicates that the operation timed out.
	ErrOperationTimeout = &Error{50002, "OPERATION_TIMEOUT", "Operation timed out"}

	// ErrRequestForwardingFailed indicates an error while forwarding the request to the primary.
	ErrRequestForwardingFailed = &Error{50003, "REQUEST_FORWARDING_FAILED", "Error forwarding request to primary"}

	// ErrUnknownLeader indicates that the leader is unknown.
	ErrUnknownLeader = &Error{50004, "UNKNOWN_LEADER", "Leader is unknown"}

	// ErrJSONParseError indicates a JSON parsing error (error code 50005 is used
	// by the RestService to indicate a JSON Parse Error).
	ErrJSONParseError = &Error{50005, "JSON_PARSE_ERROR", "JSON parsing error"}
)

Schema Registry errors as defined in the official Confluent Schema Registry. These allow clients to programmatically check for specific error conditions when interacting with the Schema Registry API.

The errors are organized by HTTP status code categories:

  • 404xx: Not Found errors
  • 409: Conflict errors
  • 422xx: Unprocessable Entity errors
  • 500xx: Internal Server errors

Reference: https://github.com/confluentinc/schema-registry/blob/master/core/src/main/java/io/confluent/kafka/schemaregistry/rest/exceptions/Errors.java

View Source
var (
	// Normalize is a Param that configures whether or not to normalize
	// schema's in certain create- or get-schema operations.
	Normalize = Param{/* contains filtered or unexported fields */}

	// Verbose is a Param that configures whether or not to return verbose
	// error messages when checking compatibility.
	Verbose = Param{/* contains filtered or unexported fields */}

	// FetchMaxID is a Param that configures whether or not to fetch the
	// max schema ID in certain get-schema operations.
	FetchMaxID = Param{/* contains filtered or unexported fields */}

	// DefaultToGlobal is a Param that changes get-compatibility or
	// get-mode to return the global compatibility or mode if the requested
	// subject does not exist.
	DefaultToGlobal = Param{/* contains filtered or unexported fields */}

	// Force is a Param that updating the mode if you are setting the mode
	// to IMPORT and schemas currently exist.
	Force = Param{/* contains filtered or unexported fields */}

	// LatestOnly is a Param that configures whether or not to return only
	// the latest schema in certain get-schema operations.
	LatestOnly = Param{/* contains filtered or unexported fields */}

	// ShowDeleted is a Param that configures whether or not to return
	// deleted schemas or subjects in certain get operations.
	ShowDeleted = Param{/* contains filtered or unexported fields */}

	// DeletedOnly is a Param that configures whether to return only
	// deleted schemas or subjects in certain get operations.
	DeletedOnly = Param{/* contains filtered or unexported fields */}
)
View Source
var (
	// ErrNotRegistered is returned from Serde when attempting to encode a
	// value or decode an ID that has not been registered, or when using
	// Decode with a missing new value function.
	ErrNotRegistered = errors.New("registration is missing for encode/decode")

	// ErrBadHeader is returned from Decode when the input slice is shorter
	// than five bytes, or if the first byte is not the magic 0 byte.
	ErrBadHeader = errors.New("5 byte header for value is missing or does not have 0 magic byte")
)

Functions

func AppendEncode

func AppendEncode(b []byte, v any, h SerdeHeader, id int, index []int, enc func([]byte, any) ([]byte, error)) ([]byte, error)

AppendEncode encodes a value and prepends the header, appends it to b and returns b. If the encoding function fails, this returns an error.

func Encode

func Encode(v any, h SerdeHeader, id int, index []int, enc func(any) ([]byte, error)) ([]byte, error)

Encode encodes a value and prepends the header. If the encoding function fails, this returns an error.

func IsInvalidRequestError added in v1.5.0

func IsInvalidRequestError(code int) bool

IsInvalidRequestError returns true if the error code indicates an invalid request.

func IsNotFoundError added in v1.5.0

func IsNotFoundError(code int) bool

IsNotFoundError returns true if the error code indicates a "not found" condition.

func IsServerError added in v1.5.0

func IsServerError(code int) bool

IsServerError returns true if the error code indicates a server-side error.

func WithParams

func WithParams(ctx context.Context, p ...Param) context.Context

WithParams adds query parameters to the given context. This is a merge operation: any non-zero parameter is kept. The variadic nature of this allows for a nicer api:

sr.WithParams(ctx, sr.Format("default"), sr.FetchMaxID)

Types

type CheckCompatibilityResult

type CheckCompatibilityResult struct {
	Is       bool     `json:"is_compatible"` // Is is true if the schema is compatible.
	Messages []string `json:"messages"`      // Messages contains reasons a schema is not compatible.
}

CheckCompatibilityResult is the response from the check compatibility endpoint.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client talks to a schema registry and contains helper functions to serialize and deserialize objects according to schemas.

func NewClient

func NewClient(opts ...ClientOpt) (*Client, error)

NewClient returns a new schema registry client.

func (*Client) AllSchemas

func (cl *Client) AllSchemas(ctx context.Context) ([]SubjectSchema, error)

AllSchemas returns all schemas for all subjects.

This supports params SubjectPrefix, ShowDeleted, and LatestOnly.

func (*Client) CheckCompatibility

func (cl *Client) CheckCompatibility(ctx context.Context, subject string, version int, s Schema) (CheckCompatibilityResult, error)

CheckCompatibility checks if a schema is compatible with the given version that exists. You can use -1 to check compatibility with the latest version, and -2 to check compatibility against all versions.

This supports params Normalize and Verbose.

func (*Client) Compatibility

func (cl *Client) Compatibility(ctx context.Context, subjects ...string) []CompatibilityResult

Compatibility returns the subject compatibility and global compatibility of each requested subject. The global compatibility can be requested by using either an empty subject or by specifying no subjects.

This supports params DefaultToGlobal.

This can return 200 or 500 per result.

func (*Client) CreateSchema

func (cl *Client) CreateSchema(ctx context.Context, subject string, s Schema) (SubjectSchema, error)

CreateSchema attempts to create a schema in the given subject.

If you want to register or create a schema with a specific ID and/or version, you can use [RegisterSchema] or [CreateSchemaWithIDAndVersion].

This supports param Normalize.

func (*Client) CreateSchemaWithIDAndVersion added in v1.3.0

func (cl *Client) CreateSchemaWithIDAndVersion(ctx context.Context, subject string, s Schema, id, version int) (SubjectSchema, error)

CreateSchemaWithIDAndVersion attempts to create a schema with a fixed ID and version in the given subject. If the id is set to -1 or 0, this method is equivalent to [CreateSchema]. If the version is set to -1 or 0, the registry will interpret it as "latest".

This function first registers the schema and then looks up all details. If you want to register a schema, only receive the registered ID back, and avoid further HTTP requests to the registry, you can use [RegisterSchema] instead.

This supports param Normalize.

func (*Client) DeleteSchema

func (cl *Client) DeleteSchema(ctx context.Context, subject string, version int, how DeleteHow) error

DeleteSchema deletes the schema at the given version. You must soft delete a schema before it can be hard deleted. You can use -1 to delete the latest version.

func (*Client) DeleteSubject

func (cl *Client) DeleteSubject(ctx context.Context, subject string, how DeleteHow) ([]int, error)

DeleteSubject deletes the subject. You must soft delete a subject before it can be hard deleted. This returns all versions that were deleted.

func (*Client) Do added in v1.5.0

func (cl *Client) Do(ctx context.Context, method, path string, v, into any) error

Do sends an HTTP request to the schema registry using the given method and path, optionally encoding the provided request body `v` as JSON, and decoding the response into `into` if non-nil.

This method is a general-purpose extension point for users who need to interact with custom or non-standard schema registry endpoints while reusing the client's configured authentication, user-agent, retry logic, and error handling.

func (*Client) LookupSchema

func (cl *Client) LookupSchema(ctx context.Context, subject string, s Schema) (SubjectSchema, error)

LookupSchema checks to see if a schema is already registered and if so, returns its ID and version in the SubjectSchema.

This supports params Normalize and Deleted.

func (*Client) Mode

func (cl *Client) Mode(ctx context.Context, subjects ...string) []ModeResult

Mode returns the subject and global mode of each requested subject. The global mode can be requested by using either an empty subject or by specifying no subjects.

This supports params DefaultToGlobal.

func (*Client) OptValue added in v1.4.0

func (cl *Client) OptValue(opt any) any

OptValue returns the value for the given configuration option. If the given option does not exist, this returns nil. This function takes either a raw ClientOpt, or an Opt function name.

If a configuration option has multiple inputs, this function returns only the first input. Variadic option inputs are returned as a single slice. Options that are internally stored as a pointer are returned as their string input; you can see if the option is internally nil by looking at the second value returned from OptValues.

	var (
 		cl, _ := NewClient(
 			URLs("foo", "bar"),
			UserAgent("baz"),
 		)
 		urls = cl.OptValue("URLs")     // urls is []string{"foo", "bar"}; string lookup for the option works
 		ua   = cl.OptValue(UserAgent)  // ua is "baz"
 		unk  = cl.OptValue("Unknown"), // unk is nil
	)

func (*Client) OptValues added in v1.4.0

func (cl *Client) OptValues(opt any) []any

OptValues returns all values for options. This method is useful for options that have multiple inputs (notably, BasicAuth). This is also useful for options that are internally stored as a pointer -- this function will return the string value of the option but also whether the option is non-nil. Boolean options are returned as a single-element slice with the bool value. Variadic inputs are returned as a signle slice. If the input option does not exist, this returns nil.

     var (
	 		cl, _ := NewClient(
	 			URLs("foo", "bar"),
				UserAgent("baz"),
	 		)
	 		urls = cl.OptValues("URLs")     // urls is []any{[]string{"foo", "bar"}}
	 		ua   = cl.OptValues(UserAgent)  // ua is []any{"baz"}
	 		ba   = cl.OptValues(BasicAuth)  // ba is []any{"user", "pass"}
	 		unk  = cl.OptValues("Unknown"), // unk is nil
     )

func (*Client) Opts added in v1.2.0

func (cl *Client) Opts() []ClientOpt

Opts returns the options that were used to create this client. This can be as a base to generate a new client, where you can add override options to the end of the original input list.

func (*Client) RegisterSchema added in v1.5.0

func (cl *Client) RegisterSchema(ctx context.Context, subject string, s Schema, id, version int) (int, error)

RegisterSchema attempts to create a schema with a fixed ID and version in the given subject, and returns the globally unique identifier assigned by the registry. If the id is set to -1 or 0, the registry will try to allocate a new identifier or reuse an existing one. If the version is set to -1 or 0, the registry will interpret it as "latest".

If the schema is already registered, the original ID will be returned and a new version will not be created.

If you want to register a schema and receive more details back, you can use [CreateSchema] or [CreateSchemaWithIDAndVersion] at the expense of more HTTP requests to the registry.

This supports param Normalize.

func (*Client) ResetCompatibility

func (cl *Client) ResetCompatibility(ctx context.Context, subjects ...string) []CompatibilityResult

ResetCompatibility deletes any subject-level compatibility and reverts to the global default. The global compatibility can be reset by either using an empty subject or by specifying no subjects.

This can return 200 or 500.

func (*Client) ResetMode

func (cl *Client) ResetMode(ctx context.Context, subjects ...string) []ModeResult

ResetMode deletes any subject modes and reverts to the global default.

func (*Client) SchemaByID

func (cl *Client) SchemaByID(ctx context.Context, id int) (Schema, error)

SchemaByID returns the schema for a given schema ID.

This supports params Subject, Format, and FetchMaxID.

func (*Client) SchemaByVersion

func (cl *Client) SchemaByVersion(ctx context.Context, subject string, version int) (SubjectSchema, error)

SchemaByVersion returns the schema for a given subject and version. You can use -1 as the version to return the latest schema.

This supports param ShowDeleted.

func (*Client) SchemaReferences

func (cl *Client) SchemaReferences(ctx context.Context, subject string, version int) ([]SubjectSchema, error)

SchemaReferences returns all schemas that references the input subject-version. You can use -1 to check the latest version.

This supports param ShowDeleted.

func (*Client) SchemaTextByID

func (cl *Client) SchemaTextByID(ctx context.Context, id int) (string, error)

SchemaTextByID returns the actual text of a schema.

For example, if the schema for an ID is

"{\"type\":\"boolean\"}"

this will return

{"type":"boolean"}

This supports params Subject, Format.

func (*Client) SchemaTextByVersion

func (cl *Client) SchemaTextByVersion(ctx context.Context, subject string, version int) (string, error)

SchemaTextByVersion returns the actual text of a schema, by subject and version. You can use -1 as the version to return the latest schema.

For example, if the schema for an ID is

"{\"type\":\"boolean\"}"

this will return

{"type":"boolean"}

This supports param ShowDeleted.

func (*Client) SchemaUsagesByID

func (cl *Client) SchemaUsagesByID(ctx context.Context, id int) ([]SubjectSchema, error)

SchemaUsagesByID returns all usages of a given schema ID. A single schema's can be reused in many subject-versions; this function can be used to map a schema to all subject-versions that use it.

This supports param ShowDeleted.

func (*Client) SchemaVersionsByID

func (cl *Client) SchemaVersionsByID(ctx context.Context, id int) ([]SubjectVersion, error)

SchemaVersionsByID returns all subject versions associated with a schema ID.

This supports params Subject and ShowDeleted.

func (*Client) Schemas

func (cl *Client) Schemas(ctx context.Context, subject string) ([]SubjectSchema, error)

Schemas returns all schemas for the given subject.

This supports param ShowDeleted.

func (*Client) SetCompatibility

func (cl *Client) SetCompatibility(ctx context.Context, compat SetCompatibility, subjects ...string) []CompatibilityResult

SetCompatibility sets the compatibility for each requested subject. The global compatibility can be set by either using an empty subject or by specifying no subjects. If specifying no subjects, this returns one element.

func (*Client) SetMode

func (cl *Client) SetMode(ctx context.Context, mode Mode, subjects ...string) []ModeResult

SetMode sets the mode for each requested subject. The global mode can be set by either using an empty subject or by specifying no subjects. If specifying no subjects, this returns one element.

This supports params Force.

func (*Client) SubjectVersions

func (cl *Client) SubjectVersions(ctx context.Context, subject string) ([]int, error)

SubjectVersions returns all versions for a given subject.

This supports params ShowDeleted and DeletedOnly.

func (*Client) Subjects

func (cl *Client) Subjects(ctx context.Context) ([]string, error)

Subjects returns subjects available in the registry.

This supports params SubjectPrefix, ShowDeleted, and DeletedOnly.

func (*Client) SubjectsByID

func (cl *Client) SubjectsByID(ctx context.Context, id int) ([]string, error)

SubjectsByID returns the subjects associated with a schema ID.

This supports params Subject and ShowDeleted.

func (*Client) SupportedTypes

func (cl *Client) SupportedTypes(ctx context.Context) ([]SchemaType, error)

SupportedTypes returns the schema types that are supported in the schema registry.

type ClientOpt

type ClientOpt interface {
	// contains filtered or unexported methods
}

ClientOpt is an option to configure a client.

func BasicAuth

func BasicAuth(user, pass string) ClientOpt

BasicAuth sets basic authorization to use for every request.

func BearerToken added in v1.1.0

func BearerToken(token string) ClientOpt

BearerToken sets an Authorization header to use for every request. The format will be: "Authorization: Bearer $token".

func DefaultParams

func DefaultParams(ps ...Param) ClientOpt

DefaultParams sets default parameters to apply to every request.

func DialTLSConfig

func DialTLSConfig(c *tls.Config) ClientOpt

DialTLSConfig sets a tls.Config to use in the http client, either the default client or a client previously set with the HTTPClient option. When setting this option, HTTP/2 support may not be enabled by default.

func HTTPClient

func HTTPClient(httpcl *http.Client) ClientOpt

HTTPClient sets the http client that the schema registry client uses, overriding the default client that speaks plaintext with a timeout of 5s.

func LogFn added in v1.4.0

func LogFn(logFn func(int8, string, ...any)) ClientOpt

LogFn sets the logger function to use.

func LogLevel added in v1.4.0

func LogLevel(lvl int8) ClientOpt

LogLevel sets a static log level to use, overriding the default "info" level.

There are five levels:

  • None (0)
  • Error (1)
  • Warn (2)
  • Info (3)
  • Debug (4)

This package defines int8 constants for convenience. The levels (and log function) mirror kgo's Logger and LogLevel definitions, making it easy to use any existing kgo logging functionality you may already be using in this package as well.

func LogLevelFn added in v1.4.0

func LogLevelFn(fn func() int8) ClientOpt

LogLevelFn sets a function to return the log level dynamically. See LogLevel for more information.

func PreReq added in v1.3.0

func PreReq(preReq func(req *http.Request) error) ClientOpt

PreReq sets a hook func to call before every request is sent.

func URLs

func URLs(urls ...string) ClientOpt

URLs sets the URLs that the client speaks to, overriding the default http://localhost:8081. This option automatically prefixes any URL that is missing an http:// or https:// prefix with http://.

func UserAgent

func UserAgent(ua string) ClientOpt

UserAgent sets the User-Agent to use in requests, overriding the default "franz-go".

type CompatibilityLevel

type CompatibilityLevel int

CompatibilityLevel as an enum representing config compatibility levels.

const (
	CompatNone CompatibilityLevel = 1 + iota
	CompatBackward
	CompatBackwardTransitive
	CompatForward
	CompatForwardTransitive
	CompatFull
	CompatFullTransitive
)

func (CompatibilityLevel) MarshalText

func (l CompatibilityLevel) MarshalText() ([]byte, error)

func (CompatibilityLevel) String

func (l CompatibilityLevel) String() string

func (*CompatibilityLevel) UnmarshalText

func (l *CompatibilityLevel) UnmarshalText(text []byte) error

type CompatibilityResult

type CompatibilityResult struct {
	Subject string `json:"-"` // The subject this compatibility result is for, or empty for the global compatibility..

	Level            CompatibilityLevel `json:"compatibilityLevel"` // The subject (or global) compatibility level.
	Alias            string             `json:"alias"`              // The subject alias, if any.
	Normalize        bool               `json:"normalize"`          // Whether or not schemas are normalized by default.
	Group            string             `json:"compatibilityGroup"` // The compatibility group, if any. Only schemas in the same group are checked for compatibility.
	DefaultMetadata  *SchemaMetadata    `json:"defaultMetadata"`    // Default metadata used for schema registration.
	OverrideMetadata *SchemaMetadata    `json:"overrideMetadata"`   // Override metadata used for schema registration.
	DefaultRuleSet   *SchemaRuleSet     `json:"defaultRuleSet"`     // Default rule set used for schema registration.
	OverrideRuleSet  *SchemaRuleSet     `json:"overrideRuleSet"`    // Override rule set used for schema registration.

	Err error `json:"-"` // The error received for getting this compatibility.
}

CompatibilityResult is the compatibility level for a subject.

type ConfluentHeader

type ConfluentHeader struct{}

ConfluentHeader is a SerdeHeader that produces the Confluent wire format. It starts with 0, then big endian uint32 of the ID, then index (only protobuf), then the encoded message.

https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format

func (*ConfluentHeader) AppendEncode

func (*ConfluentHeader) AppendEncode(b []byte, id int, index []int) ([]byte, error)

AppendEncode appends an encoded header to b according to the Confluent wire format and returns it. Error is always nil.

func (*ConfluentHeader) DecodeID

func (*ConfluentHeader) DecodeID(b []byte) (int, []byte, error)

DecodeID strips and decodes the schema ID from b. It returns the ID alongside the unread bytes. If the header does not contain the magic byte or b contains less than 5 bytes it returns ErrBadHeader.

func (*ConfluentHeader) DecodeIndex

func (*ConfluentHeader) DecodeIndex(b []byte, maxLength int) ([]int, []byte, error)

DecodeIndex strips and decodes indices from b. It returns the index slice alongside the unread bytes. It expects b to be the output of DecodeID (schema ID should already be stripped away). If maxLength is greater than 0 and the encoded data contains more indices than maxLength the function returns ErrNotRegistered.

func (*ConfluentHeader) UpdateID added in v1.3.0

func (*ConfluentHeader) UpdateID(b []byte, id uint32) error

UpdateID replaces the schema ID in b. If the header does not contain the magic byte or b contains less than 5 bytes it returns ErrBadHeader.

type DeleteHow

type DeleteHow bool

DeleteHow is a typed bool indicating how subjects or schemas should be deleted.

const (
	// SoftDelete performs a soft deletion.
	SoftDelete DeleteHow = false
	// HardDelete performs a hard deletion. Values must be soft deleted
	// before they can be hard deleted.
	HardDelete DeleteHow = true
)

type EncodingOpt

type EncodingOpt interface {
	// contains filtered or unexported methods
}

EncodingOpt is an option to configure the behavior of Serde.Encode and Serde.Decode.

func AppendEncodeFn

func AppendEncodeFn(fn func([]byte, any) ([]byte, error)) EncodingOpt

AppendEncodeFn allows Serde to encode a value to an existing slice. This can be more efficient than EncodeFn; this function is used if it exists.

func DecodeFn

func DecodeFn(fn func([]byte, any) error) EncodingOpt

DecodeFn allows Serde to decode into a value.

func EncodeFn

func EncodeFn(fn func(any) ([]byte, error)) EncodingOpt

EncodeFn allows Serde to encode a value.

func GenerateFn

func GenerateFn(fn func() any) EncodingOpt

GenerateFn returns a new(Value) that can be decoded into. This function can be used to control the instantiation of a new type for DecodeNew.

func Index

func Index(index ...int) EncodingOpt

Index attaches a message index to a value. A single schema ID can be registered multiple times with different indices.

This option supports schemas that encode many different values from the same schema (namely, protobuf). The index into the schema to encode a particular message is specified with `index`.

NOTE: this option must be used for protobuf schemas.

For more information, see where `message-indexes` are described in:

https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#wire-format

type Error added in v1.5.0

type Error struct {
	// Code is the numeric error code returned by the Schema Registry.
	Code int
	// Name is the string representation of the error.
	Name string
	// Description is a human-readable description of what this error means.
	Description string
}

Error represents a Schema Registry error with its code, name, and description.

func ErrorForCode added in v1.5.0

func ErrorForCode(code int) *Error

ErrorForCode returns the Error corresponding to the given error code. If the code is unknown, this returns ErrUnknown.

func (*Error) Error added in v1.5.0

func (e *Error) Error() string

Error implements the error interface, returning the human-readable description.

func (*Error) LogValue added in v1.5.0

func (e *Error) LogValue() slog.Value

LogValue implements slog.LogValuer to provide structured logging output.

type Mode

type Mode int

Mode as an enum representing the "mode" of the registry or a subject.

const (
	ModeImport Mode = iota
	ModeReadOnly
	ModeReadWrite
)

func (Mode) MarshalText

func (m Mode) MarshalText() ([]byte, error)

func (Mode) String

func (m Mode) String() string

func (*Mode) UnmarshalText

func (m *Mode) UnmarshalText(text []byte) error

type ModeResult

type ModeResult struct {
	Subject string // The subject this mode result is for, or empty for the global mode.
	Mode    Mode   // The subject (or global) mode.
	Err     error  // The error received for getting this mode.
}

ModeResult is the mode for a subject.

type Param

type Param struct {
	// contains filtered or unexported fields
}

Param is a parameter that can be passed to various APIs. Each API documents the parameters they accept.

func Format

func Format(f string) Param

Format returns a Param that configures how schema's are returned in certain get-schema operations.

For Avro schemas, the Format param supports "default" or "resolved". For Protobuf schemas, the Format param supports "default", "ignore_extensions", or "serialized".

func RawParams added in v1.5.0

func RawParams(v url.Values) Param

RawParams returns a Param with raw query parameters.

func Subject

func Subject(s string) Param

Subject returns a Param limiting which subject is returned in certain list-schema or list-subject operations.

func SubjectPrefix

func SubjectPrefix(pfx string) Param

SubjectPrefix returns a Param that filters subjects by prefix when listing schemas.

type ResponseError

type ResponseError struct {
	// Method is the requested http method.
	Method string `json:"-"`
	// URL is the full path that was requested that resulted in this error.
	URL string `json:"-"`
	// StatusCode is the status code that was returned for this error.
	StatusCode int `json:"-"`
	// Raw contains the raw response body.
	Raw []byte `json:"-"`

	ErrorCode int    `json:"error_code"`
	Message   string `json:"message"`
}

ResponseError is the type returned from the schema registry for errors.

func (*ResponseError) Error

func (e *ResponseError) Error() string

func (*ResponseError) SchemaError added in v1.5.0

func (e *ResponseError) SchemaError() *Error

SchemaError returns the typed Error corresponding to this response error's error code. If the error code is unknown, this returns nil.

type Schema

type Schema struct {
	// Schema is the actual unescaped text of a schema.
	Schema string `json:"schema"`

	// Type is the type of a schema. The default type is avro.
	Type SchemaType `json:"schemaType,omitempty"`

	// References declares other schemas this schema references. See the
	// docs on SchemaReference for more details.
	References []SchemaReference `json:"references,omitempty"`

	// SchemaMetadata is arbitrary information about the schema.
	SchemaMetadata *SchemaMetadata `json:"metadata,omitempty"`

	// SchemaRuleSet is a set of rules that govern the schema.
	SchemaRuleSet *SchemaRuleSet `json:"ruleSet,omitempty"`
}

Schema is the object form of a schema for the HTTP API.

type SchemaMetadata

type SchemaMetadata struct {
	Tags       map[string][]string `json:"tags,omitempty"`
	Properties map[string]string   `json:"properties,omitempty"`
	Sensitive  []string            `json:"sensitive,omitempty"`
}

SchemaMetadata is arbitrary information about the schema or its constituent parts, such as whether a field contains sensitive information or who created a data contract.

type SchemaReference

type SchemaReference struct {
	Name    string `json:"name"`
	Subject string `json:"subject"`
	Version int    `json:"version"`
}

SchemaReference is a way for a one schema to reference another. The details for how referencing is done are type specific; for example, JSON objects that use the key "$ref" can refer to another schema via URL. For more details on references, see the following link:

https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#schema-references
https://docs.confluent.io/platform/current/schema-registry/develop/api.html

type SchemaRule

type SchemaRule struct {
	Name      string            `json:"name"`                // Name is a user-defined name to reference the rule.
	Doc       string            `json:"doc,omitempty"`       // Doc is an optional description of the rule.
	Kind      SchemaRuleKind    `json:"kind"`                // Kind is the type of rule.
	Mode      SchemaRuleMode    `json:"mode"`                // Mode is the mode of the rule.
	Type      string            `json:"type"`                // Type is the type of rule, which invokes a specific rule executor, such as Google Common Expression Language (CEL) or JSONata.
	Tags      []string          `json:"tags"`                // Tags to which this rule applies.
	Params    map[string]string `json:"params,omitempty"`    // Optional params for the rule.
	Expr      string            `json:"expr"`                // Expr is the rule expression.
	OnSuccess string            `json:"onSuccess,omitempty"` // OnSuccess is an optional action to execute if the rule succeeds, otherwise the built-in action type NONE is used. For UPDOWN and WRITEREAD rules, one can specify two actions separated by commas, such as "NONE,ERROR" for a WRITEREAD rule. In this case NONE applies to WRITE and ERROR applies to READ
	OnFailure string            `json:"onFailure,omitempty"` // OnFailure is an optional action to execute if the rule fails, otherwise the built-in action type NONE is used. See OnSuccess for more details.
	Disabled  bool              `json:"disabled,omitempty"`  // Disabled specifies whether the rule is disabled.
}

SchemaRule specifies integrity constraints or data policies in a data contract. These data rules or policies can enforce that a field that contains sensitive information must be encrypted, or that a message containing an invalid age must be sent to a dead letter queue

https://docs.confluent.io/platform/current/schema-registry/fundamentals/data-contracts.html#rules

type SchemaRuleKind

type SchemaRuleKind int

SchemaRuleKind as an enum representing the kind of schema rule.

const (
	SchemaRuleKindTransform SchemaRuleKind = iota
	SchemaRuleKindCondition
)

func (SchemaRuleKind) MarshalText

func (k SchemaRuleKind) MarshalText() ([]byte, error)

func (SchemaRuleKind) String

func (k SchemaRuleKind) String() string

func (*SchemaRuleKind) UnmarshalText

func (k *SchemaRuleKind) UnmarshalText(text []byte) error

type SchemaRuleMode

type SchemaRuleMode int

SchemaRuleMode specifies a schema rule's mode.

Migration rules can be specified for an UPGRADE, DOWNGRADE, or both (UPDOWN). Migration rules are used during complex schema evolution.

Domain rules can be specified during serialization (WRITE), deserialization (READ) or both (WRITEREAD).

Domain rules can be used to transform the domain values in a message payload.

const (
	SchemaRuleModeUpgrade SchemaRuleMode = iota
	SchemaRuleModeDowngrade
	SchemaRuleModeUpdown
	SchemaRuleModeWrite
	SchemaRuleModeRead
	SchemaRuleModeWriteRead
)

func (SchemaRuleMode) MarshalText

func (m SchemaRuleMode) MarshalText() ([]byte, error)

func (SchemaRuleMode) String

func (m SchemaRuleMode) String() string

func (*SchemaRuleMode) UnmarshalText

func (m *SchemaRuleMode) UnmarshalText(text []byte) error

type SchemaRuleSet

type SchemaRuleSet struct {
	MigrationRules []SchemaRule `json:"migrationRules,omitempty"`
	DomainRules    []SchemaRule `json:"domainRules,omitempty"`
}

SchemaRuleSet groups migration rules and domain validation rules.

type SchemaType

type SchemaType int

SchemaType as an enum representing schema types. The default schema type is avro.

const (
	TypeAvro SchemaType = iota
	TypeProtobuf
	TypeJSON
)

func (SchemaType) MarshalText

func (t SchemaType) MarshalText() ([]byte, error)

func (SchemaType) String

func (t SchemaType) String() string

func (*SchemaType) UnmarshalText

func (t *SchemaType) UnmarshalText(text []byte) error

type Serde

type Serde struct {
	// contains filtered or unexported fields
}

Serde encodes and decodes values according to the schema registry wire format. A Serde itself does not perform schema auto-discovery and type auto-decoding. To aid in strong typing and validated encoding/decoding, you must register IDs and values to how to encode or decode them.

To use a Serde for encoding, you must pre-register schema ids and values you will encode, and then you can use the encode functions.

To use a Serde for decoding, you can either pre-register schema ids and values you will consume, or you can discover the schema every time you receive an ErrNotRegistered error from decode.

func NewSerde

func NewSerde(opts ...SerdeOrEncodingOpt) *Serde

NewSerde returns a new Serde using the supplied default options, which are applied to every registered type. These options are always applied first, so you can override them as necessary when registering.

This can be useful if you always want to use the same encoding or decoding functions.

func (*Serde) AppendEncode

func (s *Serde) AppendEncode(b []byte, v any) ([]byte, error)

AppendEncode encodes a value and prepends the header according to the configured SerdeHeader, appends it to b and returns b. If EncodeFn was not registered, this returns ErrNotRegistered.

func (*Serde) Decode

func (s *Serde) Decode(b []byte, v any) error

Decode decodes b into v. If DecodeFn option was not used, this returns ErrNotRegistered.

Serde does not handle references in schemas; it is up to you to register the full decode function for any top-level ID, regardless of how many other schemas are referenced in top-level ID.

func (*Serde) DecodeID

func (s *Serde) DecodeID(b []byte) (id int, out []byte, err error)

DecodeID decodes an ID from b, returning the ID and the remaining bytes, or an error.

func (*Serde) DecodeIndex

func (s *Serde) DecodeIndex(in []byte, maxLength int) (index []int, out []byte, err error)

DecodeIndex decodes at most maxLength of a schema index from in, returning the index and remaining bytes, or an error. It expects b to be the output of DecodeID (schema ID should already be stripped away).

func (*Serde) DecodeNew

func (s *Serde) DecodeNew(b []byte) (any, error)

DecodeNew is the same as Decode, but decodes into a new value rather than the input value. If DecodeFn was not used, this returns ErrNotRegistered. GenerateFn can be used to control the instantiation of a new value, otherwise this uses reflect.New(reflect.TypeOf(v)).Interface().

func (*Serde) Encode

func (s *Serde) Encode(v any) ([]byte, error)

Encode encodes a value and prepends the header according to the configured SerdeHeader. If EncodeFn was not used, this returns ErrNotRegistered.

func (*Serde) MustAppendEncode

func (s *Serde) MustAppendEncode(b []byte, v any) []byte

MustAppendEncode returns the value of AppendEncode, panicking on error. This is a shortcut for if your encode function cannot error.

func (*Serde) MustEncode

func (s *Serde) MustEncode(v any) []byte

MustEncode returns the value of Encode, panicking on error. This is a shortcut for if your encode function cannot error.

func (*Serde) Register

func (s *Serde) Register(id int, v any, opts ...EncodingOpt)

Register registers a schema ID and the value it corresponds to, as well as the encoding or decoding functions. You need to register functions depending on whether you are only encoding, only decoding, or both.

type SerdeHeader

type SerdeHeader interface {
	// AppendEncode encodes a schema ID and optional index to b, returning the
	// updated slice or an error.
	AppendEncode(b []byte, id int, index []int) ([]byte, error)
	// DecodeID decodes an ID from in, returning the ID and the remaining bytes,
	// or an error.
	DecodeID(in []byte) (id int, out []byte, err error)
	// DecodeIndex decodes at most maxLength of a schema index from in,
	// returning the index and remaining bytes, or an error.
	DecodeIndex(in []byte, maxLength int) (index []int, out []byte, err error)
}

SerdeHeader encodes and decodes a message header.

type SerdeOpt

type SerdeOpt interface {
	// contains filtered or unexported methods
}

SerdeOpt is an option to configure Serde.

func Header(header SerdeHeader) SerdeOpt

Header defines the SerdeHeader used to encode and decode the message header.

type SerdeOrEncodingOpt

type SerdeOrEncodingOpt interface {
	// contains filtered or unexported methods
}

SerdeOrEncodingOpt is either a SerdeOpt or EncodingOpt.

type SetCompatibility

type SetCompatibility struct {
	Level            CompatibilityLevel `json:"compatibility"`                // The subject (or global) compatibility level.
	Alias            string             `json:"alias,omitempty"`              // The subject alias, if any.
	Normalize        bool               `json:"normalize,omitempty"`          // Whether or not schemas are normalized by default.
	Group            string             `json:"compatibilityGroup,omitempty"` // The compatibility group, if any. Only schemas in the same group are checked for compatibility.
	DefaultMetadata  *SchemaMetadata    `json:"defaultMetadata,omitempty"`    // Default metadata used for schema registration.
	OverrideMetadata *SchemaMetadata    `json:"overrideMetadata,omitempty"`   // Override metadata used for schema registration.
	DefaultRuleSet   *SchemaRuleSet     `json:"defaultRuleSet,omitempty"`     // Default rule set used for schema registration.
	OverrideRuleSet  *SchemaRuleSet     `json:"overrideRuleSet,omitempty"`    // Override rule set used for schema registration.
}

SetCompatibility contains information used for setting global or per-subject compatibility configuration.

The main difference between this and the CompatibilityResult is that this struct marshals the compatibility level as "compatibility".

type SubjectSchema

type SubjectSchema struct {
	// Subject is the subject for this schema. This usually corresponds to
	// a Kafka topic, and whether this is for a key or value. For example,
	// "foo-key" would be the subject for the foo topic for serializing the
	// key field of a record.
	Subject string `json:"subject,omitempty"`

	// Version is the version of this subject.
	Version int `json:"version,omitempty"`

	// ID is the globally unique ID of the schema.
	ID int `json:"id,omitempty"`

	Schema
}

SubjectSchema pairs the subject, global identifier, and version of a schema with the schema itself.

func CommSubjectSchemas

func CommSubjectSchemas(l, r []SubjectSchema) (luniq, runiq, common []SubjectSchema)

CommSubjectSchemas splits l and r into three sets: what is unique in l, what is unique in r, and what is common in both. Duplicates in either map are eliminated.

type SubjectVersion

type SubjectVersion struct {
	Subject string `json:"subject"`
	Version int    `json:"version"`
}

SubjectVersion is a subject version pair.

Directories

Path Synopsis
Package srfake provides an in-memory, concurrency-safe mock implementation of the Confluent Schema Registry REST API for unit and integration testing.
Package srfake provides an in-memory, concurrency-safe mock implementation of the Confluent Schema Registry REST API for unit and integration testing.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL