Riak 2.x’s new client is powerful, but can be pretty overwhelming if all one wants is to get(), put(), and delete() key/value pairs from a distributed store. This article covers a special, but useful, case of Riak usage and the code and configuration required to achieve it. I hope it saves someone all the time it took me to work it out (thanks to engineers at Basho for their comments.)
The first thing is to establish the requirements for our store and the configuration we’ll need to achieve it. For the sake of this example, I am assuming our default bucket setup is 3 replicas (n_val=3) and will achieve consistency-by-writes (w=all). Note that you can choose any value for w, including 1. Because read-repair is triggered after a value is returned to the client, we will usually want to use a value for r that is >1 (r=2) in order to make sure we don’t get a stale value and no chance to reconcile conflicts.
Besides n_val, w, and r, there are a few more bucket and global settings we need to adjust:
- We do not want Riak to generate “siblings”, so we need to set ‘allow_mult = false’; this lets Riak resolve conflicts for us (see next item)
- We do want Riak to use ‘causal context’ (a combination of vector clocks and time stamps) to reconcile inconsistent values, so we set ‘last_write_wins = false’ to prevent timestamps being the only criteria
- Because we don’t want a deletion to mask the existence of a value, we have to turn off this optimization: ‘notfound_ok = false’
- As a result of setting ‘notfound_ok = false’, we can alleviate the resulting performance hit with ‘basic_quorum = true’; this prevents the need to wait for all replicas if a deletion is found first
The default bucket properties in riak.config would like like this:
# Default bucket properties buckets.default.n_val = 3 buckets.default.r = 2 buckets.default.w = 3 buckets.default.allow_mult = false buckets.default.last_write_wins = false buckets.default.notfound_ok = false buckets.default.basic_quorum = true
Finally, in order to prevent certain edge cases causing spooky resurrection of deleted keys, we will set the delete_mode to immediate in advanced.config
[ {riak_kv, [ %% Delete mode {delete_mode, immediate}, %% Dotted version vectors {dvv_enabled, true} ] } ].
The ‘dotted version vectors’ are a nice-to-have but don’t affect this article. They are simply an enhancement of the original vector clocks used by older versions of Riak.
OK, enough prolog, let’s get to the code. Here, with proper handling of causal context, updates and deletes, as well as efficiency when not actually fetching values, is a RiakKVClient:
public class RiakKVClient { /** * Constructor. Use RiakProvider.getStoreClient(name) instead. * @param name bucket name * @param client low-level Riak Java client instance */ protected RiakKVClient(final String name, final com.basho.riak.client.api.RiakClient client) { this.name = name; this.namespace = new Namespace(name); this.client = client; } /** * Get a value associated with a key * @param key * @return the value associated with the given key * @throws IOException */ public byte[] get(final String key) throws IOException { if (key == null) { throw new IllegalArgumentException("Key is required"); } try { final FetchValue.Response response = fetchValue(key); if (response.isNotFound()) { return null; } final RiakObject riakObject = response.getValue(RiakObject.class); return riakObject.getValue().getValue(); } catch (ExecutionException e) { throw new IOException("Riak failed to retrieve object from bucket: " + name + " with key: " + key, e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return null; } /** * Insert a value associated with a key. If a value already exists, update it. * @param key the key * @param value the value to store * @throws IOException */ public void put(final String key, final byte[] value) throws IOException { if (key == null || value == null) { throw new IllegalArgumentException("All parameters are required"); } try { // fetch in order to get the causal context final FetchValue.Response response = fetchMetadata(key); final RiakObject storeObject = new RiakObject().setValue(BinaryValue.create(value)).setContentType("binary/octet-stream"); StoreValue.Builder builder = new StoreValue.Builder(storeObject).withLocation(new Location(namespace, key)); final VClock vectorClock = response.getVectorClock(); if (vectorClock != null) { builder = builder.withVectorClock(vectorClock); } final StoreValue storeValue = builder.build(); client.execute(storeValue); } catch (ExecutionException e) { throw new IOException("Riak failed to store object in bucket: " + name + " with key: " + key, e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } /** * Delete the value associated with the given key. * @param key the key to delete * @returns true if the value existed; false if o/w (and method will have no effect) * @throws IOException */ public boolean delete(final String key) throws IOException { if (key == null) { throw new IllegalArgumentException("Key is required"); } try { // fetch in order to get the causal context final FetchValue.Response response = fetchMetadata(key); if (response.isNotFound()) { return false; } DeleteValue.Builder builder = new DeleteValue.Builder(new Location(namespace, key)); final VClock vectorClock = response.getVectorClock(); if (vectorClock != null) { builder = builder.withVClock(vectorClock); } final DeleteValue deleteValue = builder.build(); client.execute(deleteValue); return !response.isNotFound() || !response.hasValues(); } catch (ExecutionException e) { throw new IOException("Riak failed to store object in bucket: " + name + " with key: " + key, e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } return false; } private FetchValue.Response fetchMetadata(final String key) throws ExecutionException, InterruptedException { return fetchResponse(key, true); } private FetchValue.Response fetchValue(final String key) throws ExecutionException, InterruptedException { return fetchResponse(key, false); } private FetchValue.Response fetchResponse(final String key, boolean headOnly) throws ExecutionException, InterruptedException { Location loc = new Location(namespace, key); FetchValue.Builder builder = new FetchValue.Builder(loc); if (headOnly) { builder.withOption(FetchValue.Option.HEAD, true); } FetchValue fetch = builder.build(); return client.execute(fetch); } private final Namespace namespace; private final String name; private final com.basho.riak.client.api.RiakClient client; }
The only thing missing is the RiakProvider class for creating instances of RiakKVClients. Also, the code may be difficult to read given the formatting of this blog, so here’s the full Gist
If you have questions on why something is the way it is, please leave a non-anonymous comment.
References: