本页面提供了 Memorystore for Valkey 实例的代码示例,这些实例同时具有已启用集群模式和已停用集群模式。
已启用集群模式的代码示例
Memorystore for Valkey 与所有 Memorystore for Redis 集群客户端代码示例兼容:
- Memorystore for Redis Cluster 客户端库代码示例
- Memorystore for Redis 集群客户端库连接代码示例
Valkey GLIDE 简介
Valkey General Language Independent Driver for the Enterprise (GLIDE) 是一款开源客户端库,支持所有 Valkey 命令。
您可以使用 Valkey GLIDE 将应用连接到 Memorystore for Valkey 实例。Valkey GLIDE 旨在实现可靠性、优化性能和高可用性。
为了帮助确保应用开发和运营的一致性,Valkey GLIDE 使用以 Rust 编写的核心驱动程序框架实现,并具有特定于语言的扩展程序。此设计可确保不同语言版本的功能保持一致,并降低复杂性。
Valkey GLIDE 支持 Valkey 7.2 版和 8.0 版。该功能支持以下语言:
其他 Valkey 开源客户端库
除了 Valkey GLIDE 之外,Memorystore for Valkey 还与以下 Valkey OSS 客户端库兼容:
已停用集群模式的代码示例
Memorystore for Valkey Cluster Mode Disabled 与启用集群模式的代码示例中列出的所有 Redis 和 Valkey OSS 客户端库兼容。
在 Memorystore for Valkey 中使用 Cluster Mode Disabled 实例时,请完成以下操作:
- 请使用 Redis 或 Valkey 客户端对象,而不是库提供的
RedisCluster
或ValkeyCluster
客户端对象。 - 使用主端点 IP 地址创建写入器客户端。
使用读取器端点 IP 地址创建读取器客户端。
redis-py
我们建议使用 redis-py 5.1 及更高版本。
import redis primaryEndpoint = PRIMARY_ENDPOINT_IP readerEndpoint = READER_ENDPOINT_IP primary_client = redis.Redis(host=primaryEndpoint, port=6379, db=0, decode_responses=True) reader_client = redis.Redis(host=readerEndpoint, port=6379, db=0, decode_responses=True) primary_client.set("key","value") print(reader_client.get("key"))
go-redis
我们建议使用 go-redis 9.11.0 版及更高版本。
package main import ( "context" "fmt" "github.com/redis/go-redis/v9" ) func main() { primary_endpoint := PRIMARY_ENDPOINT_IP reader_endpoint := READER_ENDPOINT_IP primary_client := redis.NewClient(&redis.Options{ Addr: primary_endpoint, Password: "", // no password set DB: 0, // use default DB }) reader_client := redis.NewClient(&redis.Options{ Addr: reader_endpoint, Password: "", // no password set DB: 0, // use default DB }) ctx := context.Background() err := primary_client.Set(ctx, "foo", "bar", 0).Err() if err != nil { panic(err) } val, err := reader_client.Get(ctx, "foo").Result() if err != nil { panic(err) } fmt.Println("foo", val) }
Jedis
我们建议使用 Jedis 4.4.0 版及更高版本。
package org.example; import java.io.*; import java.time.LocalDateTime; import java.lang.Thread; import java.util.HashMap; import java.util.Map; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.JedisPoolConfig; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; public class Main { public static void main( String[] args ) { primaryEndpoint = PRIMARY_ENDPOINT_IP JedisPool pool = new JedisPool(primaryEndpoint, 6379); try (Jedis jedis = pool.getResource()) { jedis.set("foo", "bar"); System.out.println(jedis.get("foo")); // prints bar Maphash = new HashMap<>();; hash.put("name", "John"); hash.put("surname", "Smith"); hash.put("company", "Redis"); hash.put("age", "29"); jedis.hset("user-session:123", hash); System.out.println(jedis.hgetAll("user-session:123")); // Prints: {name=John, surname=Smith, company=Redis, age=29} } catch (Exception e) { System.out.println("Error setting or getting key: " + e.getMessage()); } } }
Node.js
我们建议使用 Node.js 24.4.1 及更高版本。
import { createClient } from 'redis'; import * as fs from 'fs'; const primaryEndpoint = PRIMARY_ENDPOINT_IP const primary_endpoint_url ='redis://primaryEndpoint:6379' const client = createClient({ url: primary_endpoint_url }); await client.connect(); await client.set(key, value); const retval = await client.get(key); console.log(retval)
同时实现 IAM 身份验证和传输中加密的代码示例
本部分提供了一个示例,说明如何通过各种客户端库使用 IAM 身份验证和传输中加密来验证 Memorystore for Valkey 实例的身份并连接到该实例。
redis-py
我们建议使用 redis-py 5.1 及更高版本。
from google.cloud import iam_credentials_v1 from redis.backoff import ConstantBackoff from redis.retry import Retry from redis.exceptions import ( ConnectionError, AuthenticationWrongNumberOfArgsError, AuthenticationError ) from redis.utils import (str_if_bytes) import redis service_account="projects/-/serviceAccounts/<TO-DO-1: your service account that used to authenticate to Valkey>"" host=<TO-DO-2: your Redis Cluster discovery endpoint ip> ssl_ca_certs=<TO-DO-3, your trusted server ca file name> def generate_access_token(): # Create a client client = iam_credentials_v1.IAMCredentialsClient() # Initialize request argument(s) request = iam_credentials_v1.GenerateAccessTokenRequest( name=service_account, scope=['https://www.googleapis.com/auth/cloud-platform'], ) # Make the request response = client.generate_access_token(request=request) print(str(response.access_token)) # Handle the response return str(response.access_token) class ValkeyTokenProvider(redis.CredentialProvider): # Generated IAM tokens are valid for 15 minutes def get_credentials(self): token= generate_access_token() return "default",token creds_provider = ValkeyTokenProvider() client = redis.Redis(host=host, port=6379, credential_provider=creds_provider, ssl=True, ssl_ca_certs=caFilePath) client.set('foo',"bar") print(client.get('foo'))
Go
我们建议使用 Go 1.24.5 版及更高版本。
package main import ( "context" "crypto/tls" "crypto/x509" "flag" "fmt" "io/ioutil" "log" "sync" "time" credentials "google.golang.org/genproto/googleapis/iam/credentials/v1" "github.com/golang/protobuf/ptypes" "github.com/redis/go-redis/v9" "google.golang.org/api/option" gtransport "google.golang.org/api/transport/grpc" ) var ( svcAccount = flag.String("a", "projects/-/serviceAccounts/example-service-account@example-project.iam.gserviceaccount.com", "service account email") lifetime = flag.Duration("d", time.Hour, "lifetime of token") refreshDuration = flag.Duration("r", 5*time.Minute, "token refresh duration") checkTokenExpiryInterval = flag.Duration("e", 10*time.Second, "check token expiry interval") lastRefreshInstant = time.Time{} errLastSeen = error(nil) token = "" mu = sync.RWMutex{} err = error(nil) ) func retrieveToken() (string, error) { ctx := context.Background() conn, err := gtransport.Dial(ctx, option.WithEndpoint("iamcredentials.googleapis.com:443"), option.WithScopes("https://www.googleapis.com/auth/cloud-platform")) if err != nil { log.Printf("Failed to dial API, error: %v", err) return token, err } client := credentials.NewIAMCredentialsClient(conn) req := credentials.GenerateAccessTokenRequest{ Name: *svcAccount, Scope: []string{"https://www.googleapis.com/auth/cloud-platform"}, Lifetime: ptypes.DurationProto(*lifetime), } rsp, err := client.GenerateAccessToken(ctx, &req) if err != nil { log.Printf("Failed to call GenerateAccessToken with request: %v, error: %v", req, err) return token, err } return rsp.AccessToken, nil } func refreshTokenLoop() { if *refreshDuration > *lifetime { log.Fatal("Refresh should not happen after token is already expired.") } for { mu.RLock() lastRefreshTime := lastRefreshInstant mu.RUnlock() if time.Now().After(lastRefreshTime.Add(*refreshDuration)) { var err error retrievedToken, err := retrieveToken() mu.Lock() token = retrievedToken if err != nil { errLastSeen = err } else { lastRefreshInstant = time.Now() } mu.Unlock() } time.Sleep(*checkTokenExpiryInterval) } } func retrieveTokenFunc() (string, string) { mu.RLock() defer mu.RUnlock() if time.Now().After(lastRefreshInstant.Add(*refreshDuration)) { log.Printf("Token is expired. last refresh instant: %v, refresh duration: %v, error that was last seen: %v", lastRefreshInstant, *refreshDuration, errLastSeen) return "", "" } username := "default" password := token return username, password } func main() { caFilePath := CA_FILE_PATH clusterDicEpAddr := PRIMARY_ENDPOINT_IP_ADDRESS_AND_PORT caCert, err := ioutil.ReadFile(caFilePath) if err != nil { log.Fatal(err) } caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) token, err = retrieveToken() if err != nil { log.Fatal("Cannot retrieve IAM token to authenticate to the cluster, error: %v", err) } token, err = retrieveToken() fmt.Printf("token : %v", token) if err != nil { log.Fatal("Cannot retrieve IAM token to authenticate to the cluster, error: %v", err) } lastRefreshInstant = time.Now() go refreshTokenLoop() client := redis.NewClient(&redis.Options{ Addr: clusterDicEpAddr, CredentialsProvider: retrieveTokenFunc, TLSConfig: &tls.Config{ RootCAs: caCertPool, }, }) ctx := context.Background() err = client.Set(ctx, "foo", "bar", 0).Err() if err != nil { log.Fatal(err) } val, err := client.Get(ctx, "foo").Result() if err != nil { log.Fatal(err) } fmt.Printf("\nGot the value for key: key, which is %s \n", val) }