Making cartservice more reliable

1. Making sure we re-create redis connection upon disconnect
2. Fixed local cart store implementation to handle updates (useful for testing w/o redis)
3. Fixed windows scripts to work against redis correctly
This commit is contained in:
Simon Zeltser 2018-07-02 13:26:37 -07:00
parent 11c208a9f4
commit d457f7ec28
6 changed files with 179 additions and 103 deletions

2
.gitignore vendored
View file

@ -4,3 +4,5 @@ pkg/
*.pyc *.pyc
*.swp *.swp
*~ *~
.vs/slnx.sqlite
.vs/microservices-demo/v15/.suo

View file

@ -33,26 +33,36 @@ namespace cartservice
{ {
// Run the server in a separate thread and make the main thread busy waiting. // Run the server in a separate thread and make the main thread busy waiting.
// The busy wait is because when we run in a container, we can't use techniques such as waiting on user input (Console.Readline()) // The busy wait is because when we run in a container, we can't use techniques such as waiting on user input (Console.Readline())
Task.Run(async () => Task serverTask = Task.Run(async () =>
{ {
Console.WriteLine($"Trying to start a grpc server at {host}:{port}"); try
Server server = new Server
{ {
Services = { Hipstershop.CartService.BindService(new CartServiceImpl(cartStore)) }, Console.WriteLine($"Trying to start a grpc server at {host}:{port}");
Ports = { new ServerPort(host, port, ServerCredentials.Insecure) } Server server = new Server
}; {
Services = { Hipstershop.CartService.BindService(new CartServiceImpl(cartStore)) },
Ports = { new ServerPort(host, port, ServerCredentials.Insecure) }
};
Console.WriteLine($"Cart server is listening at {host}:{port}"); Console.WriteLine($"Cart server is listening at {host}:{port}");
server.Start(); server.Start();
await cartStore.InitializeAsync(); await cartStore.InitializeAsync();
Console.WriteLine("Initialization completed");
// Keep the server up and running
while(true)
{
Thread.Sleep(TimeSpan.FromMinutes(10));
}
}
catch (Exception ex)
{
Console.WriteLine(ex);
}
}); });
// Busy wait to keep the process alive return Task.WaitAny(new[] { serverTask });
while(true)
{
Thread.Sleep(TimeSpan.FromMinutes(10));
}
} }
static void Main(string[] args) static void Main(string[] args)
@ -69,6 +79,8 @@ namespace cartservice
Parser.Default.ParseArguments<ServerOptions>(args).MapResult( Parser.Default.ParseArguments<ServerOptions>(args).MapResult(
(ServerOptions options) => (ServerOptions options) =>
{ {
Console.WriteLine($"Started as process with id {System.Diagnostics.Process.GetCurrentProcess().Id}");
// Set hostname/ip address // Set hostname/ip address
string hostname = options.Host; string hostname = options.Host;
if (string.IsNullOrEmpty(hostname)) if (string.IsNullOrEmpty(hostname))
@ -106,7 +118,10 @@ namespace cartservice
// Redis was specified via command line or environment variable // Redis was specified via command line or environment variable
if (!string.IsNullOrEmpty(redis)) if (!string.IsNullOrEmpty(redis))
{ {
// If you want to start cart store using local cache in process, you can replace the following line with this:
// cartStore = new LocalCartStore();
cartStore = new RedisCartStore(redis); cartStore = new RedisCartStore(redis);
return StartServer(hostname, port, cartStore); return StartServer(hostname, port, cartStore);
} }
else else

View file

@ -1,6 +1,7 @@
using System; using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
using System.Threading.Tasks; using System.Threading.Tasks;
using System.Linq;
using cartservice.interfaces; using cartservice.interfaces;
using Hipstershop; using Hipstershop;
@ -10,6 +11,7 @@ namespace cartservice.cartstore
{ {
// Maps between user and their cart // Maps between user and their cart
private ConcurrentDictionary<string, Hipstershop.Cart> userCartItems = new ConcurrentDictionary<string, Hipstershop.Cart>(); private ConcurrentDictionary<string, Hipstershop.Cart> userCartItems = new ConcurrentDictionary<string, Hipstershop.Cart>();
private readonly Hipstershop.Cart emptyCart = new Hipstershop.Cart();
public Task InitializeAsync() public Task InitializeAsync()
{ {
@ -29,8 +31,17 @@ namespace cartservice.cartstore
userCartItems.AddOrUpdate(userId, newCart, userCartItems.AddOrUpdate(userId, newCart,
(k, exVal) => (k, exVal) =>
{ {
// Currently we assume that we only add to the cart // If the item exists, we update its quantity
exVal.Items.Add(new Hipstershop.CartItem { ProductId = productId, Quantity = quantity }); var existingItem = exVal.Items.SingleOrDefault(item => item.ProductId == productId);
if (existingItem != null)
{
existingItem.Quantity += quantity;
}
else
{
exVal.Items.Add(new Hipstershop.CartItem { ProductId = productId, Quantity = quantity });
}
return exVal; return exVal;
}); });
@ -40,7 +51,7 @@ namespace cartservice.cartstore
public Task EmptyCartAsync(string userId) public Task EmptyCartAsync(string userId)
{ {
Console.WriteLine($"EmptyCartAsync called with userId={userId}"); Console.WriteLine($"EmptyCartAsync called with userId={userId}");
userCartItems[userId] = new Hipstershop.Cart(); userCartItems[userId] = emptyCart;
return Task.CompletedTask; return Task.CompletedTask;
} }
@ -52,7 +63,9 @@ namespace cartservice.cartstore
if (!userCartItems.TryGetValue(userId, out cart)) if (!userCartItems.TryGetValue(userId, out cart))
{ {
Console.WriteLine($"No carts for user {userId}"); Console.WriteLine($"No carts for user {userId}");
return Task.FromResult(emptyCart);
} }
return Task.FromResult(cart); return Task.FromResult(cart);
} }
} }

View file

@ -1,9 +1,11 @@
using System; using System;
using System.IO; using System.IO;
using System.Linq; using System.Linq;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using cartservice.interfaces; using cartservice.interfaces;
using Google.Protobuf; using Google.Protobuf;
using Grpc.Core;
using Hipstershop; using Hipstershop;
using StackExchange.Redis; using StackExchange.Redis;
@ -12,137 +14,174 @@ namespace cartservice.cartstore
public class RedisCartStore : ICartStore public class RedisCartStore : ICartStore
{ {
private const string CART_FIELD_NAME = "cart"; private const string CART_FIELD_NAME = "cart";
private const int REDIS_RETRY_NUM = 5;
private static ConnectionMultiplexer redis; private volatile ConnectionMultiplexer redis;
private volatile bool isRedisConnectionOpened = false;
private readonly object locker = new object();
private readonly byte[] emptyCartBytes; private readonly byte[] emptyCartBytes;
<<<<<<< HEAD
private readonly string connectionString; private readonly string connectionString;
private readonly string redisAddr;
||||||| merged common ancestors private readonly ConfigurationOptions redisConnectionOptions;
=======
private readonly string connectionString;
>>>>>>> origin
public RedisCartStore(string redisAddress) public RedisCartStore(string redisAddress)
{ {
// Serialize empty cart into byte array. // Serialize empty cart into byte array.
var cart = new Hipstershop.Cart(); var cart = new Hipstershop.Cart();
emptyCartBytes = cart.ToByteArray(); emptyCartBytes = cart.ToByteArray();
this.redisAddr = redisAddress;
connectionString = $"{redisAddress},ssl=false,allowAdmin=true,connectRetry=5"; connectionString = $"{redisAddress},ssl=false,allowAdmin=true,connectRetry=5";
Console.WriteLine($"Going to use Redis cache at this address: {connectionString}");
redisConnectionOptions = ConfigurationOptions.Parse(connectionString);
// Try to reconnect if first retry failed (up to 5 times with exponential backoff)
redisConnectionOptions.ConnectRetry = REDIS_RETRY_NUM;
redisConnectionOptions.ReconnectRetryPolicy = new ExponentialRetry(100);
redisConnectionOptions.KeepAlive = 180;
} }
<<<<<<< HEAD
public Task InitializeAsync() public Task InitializeAsync()
{ {
||||||| merged common ancestors EnsureRedisConnected();
string connectionString = $"{redisAddress},ssl=false,allowAdmin=true"; return Task.CompletedTask;
=======
connectionString = $"{redisAddress},ssl=false,allowAdmin=true";
Console.WriteLine($"Going to use Redis cache at this address: {connectionString}");
} }
public async Task InitializeAsync() private void EnsureRedisConnected()
{ {
>>>>>>> origin if (isRedisConnectionOpened)
Console.WriteLine("Connecting to Redis: " + connectionString); {
<<<<<<< HEAD return;
}
redis = ConnectionMultiplexer.Connect(connectionString); // Connection is closed or failed - open a new one but only at the first thread
Console.WriteLine("Connected successfully to Redis"); lock (locker)
{
if (isRedisConnectionOpened)
{
return;
}
return Task.CompletedTask; Console.WriteLine("Connecting to Redis: " + connectionString);
||||||| merged common ancestors redis = ConnectionMultiplexer.Connect(redisConnectionOptions);
redis = ConnectionMultiplexer.Connect(connectionString);
======= if (redis == null || !redis.IsConnected)
redis = await ConnectionMultiplexer.ConnectAsync(connectionString, Console.Out); {
Console.WriteLine("Connected successfully to Redis"); Console.WriteLine("Wasn't able to connect to redis");
>>>>>>> origin
// We weren't able to connect to redis despite 5 retries with exponential backoff
throw new ApplicationException("Wasn't able to connect to redis");
}
Console.WriteLine("Successfully connected to Redis");
var cache = redis.GetDatabase();
Console.WriteLine("Performing small test");
cache.StringSet("cart", "OK" );
object res = cache.StringGet("cart");
Console.WriteLine($"Small test result: {res}");
redis.InternalError += (o, e) => { Console.WriteLine(e.Exception); };
redis.ConnectionRestored += (o, e) =>
{
isRedisConnectionOpened = true;
Console.WriteLine("Connection to redis was retored successfully");
};
redis.ConnectionFailed += (o, e) =>
{
Console.WriteLine("Connection failed. Disposing the object");
isRedisConnectionOpened = false;
};
isRedisConnectionOpened = true;
}
} }
public async Task AddItemAsync(string userId, string productId, int quantity) public async Task AddItemAsync(string userId, string productId, int quantity)
{ {
Console.WriteLine($"AddItemAsync called with userId={userId}, productId={productId}, quantity={quantity}"); Console.WriteLine($"AddItemAsync called with userId={userId}, productId={productId}, quantity={quantity}");
var db = redis.GetDatabase(); try
// Access the cart from the cache
var value = await db.HashGetAsync(userId, CART_FIELD_NAME);
Hipstershop.Cart cart;
if (value.IsNull)
{ {
cart = new Hipstershop.Cart(); EnsureRedisConnected();
cart.UserId = userId;
cart.Items.Add(new Hipstershop.CartItem { ProductId = productId, Quantity = quantity }); var db = redis.GetDatabase();
}
else // Access the cart from the cache
{ var value = await db.HashGetAsync(userId, CART_FIELD_NAME);
cart = Hipstershop.Cart.Parser.ParseFrom(value);
var existingItem = cart.Items.SingleOrDefault(i => i.ProductId == productId); Hipstershop.Cart cart;
if (existingItem == null) if (value.IsNull)
{ {
cart = new Hipstershop.Cart();
cart.UserId = userId;
cart.Items.Add(new Hipstershop.CartItem { ProductId = productId, Quantity = quantity }); cart.Items.Add(new Hipstershop.CartItem { ProductId = productId, Quantity = quantity });
} }
else else
{ {
existingItem.Quantity += quantity; cart = Hipstershop.Cart.Parser.ParseFrom(value);
var existingItem = cart.Items.SingleOrDefault(i => i.ProductId == productId);
if (existingItem == null)
{
cart.Items.Add(new Hipstershop.CartItem { ProductId = productId, Quantity = quantity });
}
else
{
existingItem.Quantity += quantity;
}
} }
}
await db.HashSetAsync(userId, new[]{ new HashEntry(CART_FIELD_NAME, cart.ToByteArray()) }); await db.HashSetAsync(userId, new[]{ new HashEntry(CART_FIELD_NAME, cart.ToByteArray()) });
}
catch (Exception ex)
{
throw new RpcException(new Status(StatusCode.FailedPrecondition, $"Can't access cart storage. {ex}"));
}
} }
public async Task EmptyCartAsync(string userId) public async Task EmptyCartAsync(string userId)
{ {
Console.WriteLine($"EmptyCartAsync called with userId={userId}"); Console.WriteLine($"EmptyCartAsync called with userId={userId}");
var db = redis.GetDatabase(); try
{
EnsureRedisConnected();
var db = redis.GetDatabase();
// Update the cache with empty cart for given user // Update the cache with empty cart for given user
await db.HashSetAsync(userId, new[] { new HashEntry(CART_FIELD_NAME, emptyCartBytes) }); await db.HashSetAsync(userId, new[] { new HashEntry(CART_FIELD_NAME, emptyCartBytes) });
}
catch (Exception ex)
{
throw new RpcException(new Status(StatusCode.FailedPrecondition, $"Can't access cart storage. {ex}"));
}
} }
public async Task<Hipstershop.Cart> GetCartAsync(string userId) public async Task<Hipstershop.Cart> GetCartAsync(string userId)
{ {
Console.WriteLine($"GetCartAsync called with userId={userId}"); Console.WriteLine($"GetCartAsync called with userId={userId}");
<<<<<<< HEAD
try
{
var db = redis.GetDatabase();
// Access the cart from the cache
||||||| merged common ancestors
var db = redis.GetDatabase();
=======
try
{
var db = redis.GetDatabase();
>>>>>>> origin
<<<<<<< HEAD try
var value = await db.HashGetAsync(userId, CART_FIELD_NAME); {
||||||| merged common ancestors EnsureRedisConnected();
// Access the cart from the cache
var value = await db.HashGetAsync(userId, CART_FIELD_NAME); var db = redis.GetDatabase();
=======
// Access the cart from the cache // Access the cart from the cache
var value = await db.HashGetAsync(userId, CART_FIELD_NAME); var value = await db.HashGetAsync(userId, CART_FIELD_NAME);
>>>>>>> origin
if (!value.IsNull) if (!value.IsNull)
{ {
return Hipstershop.Cart.Parser.ParseFrom(value); return Hipstershop.Cart.Parser.ParseFrom(value);
} }
}
catch (Exception e)
{
Console.WriteLine(e);
}
// We decided to return empty cart in cases when user wasn't in the cache before // We decided to return empty cart in cases when user wasn't in the cache before
return new Hipstershop.Cart(); return new Hipstershop.Cart();
}
catch (Exception ex)
{
throw new RpcException(new Status(StatusCode.FailedPrecondition, $"Can't access cart storage. {ex}"));
}
} }
} }
} }

View file

@ -9,12 +9,13 @@ GOTO End1
:local :local
set REDIS_PORT=6379 set REDIS_PORT=6379
set REDIS_ADDR=localhost:%REDIS_PORT% set REDIS_ADDR=localhost:%REDIS_PORT%
set LISTEN_ADDR=0.0.0.0 set LISTEN_ADDR=localhost
set PORT=7070 set PORT=7070
set GRPC_TRACE=all
echo running redis emulator locally on a separate window echo running redis emulator locally on a separate window
taskkill /f /im "redis-server.exe" taskkill /f /im "redis-server.exe"
start redis-server start redis-server "C:\ProgramData\chocolatey\lib\redis-64\redis.windows.conf"
echo running the cart service locally echo running the cart service locally
dotnet build ..\. dotnet build ..\.
@ -23,19 +24,24 @@ GOTO End1
:docker_local :docker_local
set REDIS_PORT=6379 set REDIS_PORT=6379
set REDIS_ADDR=redis:%REDIS_PORT% rem set REDIS_ADDR=redis:%REDIS_PORT%
set LISTEN_ADDR=0.0.0.0 set LISTEN_ADDR=localhost
set PORT=7070 set PORT=7070
echo run docker container with redis echo run docker container with redis
docker rm --force redis
start "" docker run -d --name=redis -p %REDIS_PORT%:%REDIS_PORT% redis
echo Forcing to remove redis cache so we always start the container from scratch
docker rm --force redis > nul 2>&1
echo Starting out redis container
docker run -d --name=redis redis > nul 2>&1
rem This assigns the output of ip4 addr of redis container into REDIS_ADDR
FOR /F "tokens=*" %%g IN ('docker inspect -f "{{ .NetworkSettings.Networks.bridge.IPAddress }}" redis') do (SET REDIS_ADDR=%%g)
echo addr=%REDIS_ADDR%
echo building container image for cart service echo building container image for cart service
docker build -t cartservice ..\. docker build -t cartservice ..\.
echo run container image for cart service echo run container image for cart service
docker run -it --rm -e REDIS_ADDR=%REDIS_ADDR% -e LISTEN_ADDR=%LISTEN_ADDR% -e PORT=%PORT% -p %PORT%:%PORT% cartservice docker run -it --name=cartservice --rm -e REDIS_ADDR=%REDIS_ADDR%:%REDIS_PORT% -e LISTEN_ADDR=%LISTEN_ADDR% -e PORT=%PORT% -p %PORT%:%PORT% cartservice
GOTO End1 GOTO End1

View file

@ -29,6 +29,7 @@ namespace cartservice
{ {
UserId = userId, UserId = userId,
}; };
var cart = await client.GetCartAsync(request); var cart = await client.GetCartAsync(request);
Assert.NotNull(cart); Assert.NotNull(cart);
@ -37,7 +38,7 @@ namespace cartservice
} }
[Fact] [Fact]
public async Task AddItem_ItemExists_Udpated() public async Task AddItem_ItemExists_Updated()
{ {
string userId = Guid.NewGuid().ToString(); string userId = Guid.NewGuid().ToString();