1. About Bucket4j
1.1. What is Bucket4j
Bucket4j is a Java rate-limiting library that is mainly based on the token-bucket algorithm, which is by the de-facto standard for rate-limiting in the IT industry.
Important
|
Bucket4j is more than a direct implementation of token-bucket
Its math model provides several useful extensions that are not mentioned in the classic token-bucket interpretations, such as multiple limits per bucket or overdraft. These math extensions will be detailed described later.
|
You can read more about the token bucket by following links:
-
Token bucket - Wikipedia page describes the token-bucket algorithm in classical form.
-
Non-formal overview of token-bucket algorithm - the brief overview of the token-bucket algorithm.
1.2. Bucket4j basic features
-
Absolutely non-compromise precision - Bucket4j does not operate with floats or doubles, all calculations are performed in integer arithmetic, this feature protects end-users from calculation errors involved by rounding.
-
Effective implementation in terms of concurrency:
-
Bucket4j is good scalable for multi-threading cases it by default uses lock-free implementation.
-
At the same time, the library provides different concurrency strategies that can be chosen when a default lock-free strategy is not desired.
-
-
Effective API in terms of garbage collector footprint: Bucket4j API tries to use primitive types as much as it is possible in order to avoid boxing and other types of floating garbage.
-
Pluggable listener API that allows implementing monitoring and logging.
-
Rich diagnostic API that allows investigating internal state.
-
Rich configuration management - configuration of the bucket can be changed on the fly
1.3. Bucket4j distributed features
In addition to the basic features described above, Bucket4j
provides the ability to implement rate-limiting in a cluster of JVMs:
-
Bucket4j out of the box supports any GRID solution which compatible with JCache API (JSR 107) specification.
-
Bucket4j provides the framework that allows you to quickly build integration with your own persistent technology like RDMS or key-value storage.
-
For clustered usage scenarios Bucket4j supports asynchronous API that extremely matters when going to distribute world because asynchronous API allows avoiding blocking your application threads each time when you need to execute Network request.
2. Basic functionality
2.1. Quick start examples
2.1.1. How to dependency to Bucket4j
The Bucket4j is distributed through Maven Central. You need to add the dependency to your project as described below in order to be able to compile and run examples
<dependency>
<groupId>com.bucket4j</groupId>
<artifactId>bucket4j-core</artifactId>
<version>8.0.1</version>
</dependency>
implementation 'com.bucket4j:bucket4j-core:8.0.1'
Note
|
see Java compatibility matrix if you need for build that is compatible with Java 8 |
2.1.2. Create your first Bucket, limiting the rate of heavy work
Imagine that you have a thread-pool executor and you want to know what your threads are doing at the moment when thread-pool throws RejectedExecutionException. Printing stack traces of all threads in the JVM will be the best way to know where are all threads have stuck and why the thread pool is overflown. But acquiring stack traces is a very cost operation by itself, and you want to do it not often than 1 time per 10 minutes:
// define the limit 1 time per 10 minute
Bandwidth limit = Bandwidth.simple(1, Duration.ofMinutes(10));
// construct the bucket
Bucket bucket = Bucket.builder().addLimit(limit).build();
...
try {
executor.execute(anyRunnable);
} catch (RejectedExecutionException e) {
// print stacktraces only if limit is not exceeded
if (bucket.tryConsume(1)) {
ThreadInfo[] stackTraces = ManagementFactory.getThreadMXBean().dumpAllThreads(true, true);
StacktraceUtils.print(stackTraces);
}
throw e;
}
2.1.3. Using bucket as scheduler
Suppose you need to have a fresh exchange rate between dollars and euros. To get the rate you continuously poll the third-party provider, and by contract with the provider you should poll not often than 100 times per 1 minute, else provider will block your IP:
// define the limit 100 times per 1 minute
Bandwidth limit = Bandwidth.simple(100, Duration.ofMinutes(1));
// construct the bucket
Bucket bucket = Bucket.builder().addLimit(limit).build();
...
volatile double exchangeRate;
...
// do polling in infinite loop
while (true) {
// Consume a token from the token bucket.
// If a token is not available this method will block until the refill adds one to the bucket.
bucket.asScheduler().consume(1);
exchangeRate = pollExchangeRate();
}
2.1.4. Limiting the rate of access to REST API
Imagine that you develop yet another social network and you want to provide REST API for third-party developers. To protect your system from overloading you want to introduce the following limitation:
The bucket size is 50 calls (which cannot be exceeded at any given time), with a "refill rate" of 10 calls per second that continually increases tokens in the bucket. In other words. if the client app averages 10 calls per second, it will never be throttled, and moreover, the client has overdraft equals to 50 calls which can be used if the average is a little bit higher than 10 calls/sec in a short time period.
Constructing the bucket to satisfy the requirements above is a little bit more complicated than for previous examples, because we have to deal with overdraft, but it is not rocket science:
import io.github.bucket4j.Bucket4j;
public class ThrottlingFilter implements javax.servlet.Filter {
private Bucket createNewBucket() {
long overdraft = 50;
Refill refill = Refill.greedy(10, Duration.ofSeconds(1));
Bandwidth limit = Bandwidth.classic(overdraft, refill);
return Bucket.builder().addLimit(limit).build();
}
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
HttpSession session = httpRequest.getSession(true);
String appKey = SecurityUtils.getThirdPartyAppKey();
Bucket bucket = (Bucket) session.getAttribute("throttler-" + appKey);
if (bucket == null) {
bucket = createNewBucket();
session.setAttribute("throttler-" + appKey, bucket);
}
// tryConsume returns false immediately if no tokens available with the bucket
if (bucket.tryConsume(1)) {
// the limit is not exceeded
filterChain.doFilter(servletRequest, servletResponse);
} else {
// limit is exceeded
HttpServletResponse httpResponse = (HttpServletResponse) servletResponse;
httpResponse.setContentType("text/plain");
httpResponse.setStatus(429);
httpResponse.getWriter().append("Too many requests");
}
}
}
If you want to provide more information to the end-user about the state of the bucket, then the last fragment of code above can be rewritten in the following way:
HttpServletResponse httpResponse = (HttpServletResponse) servletResponse;
ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1);
if (probe.isConsumed()) {
// the limit is not exceeded
httpResponse.setHeader("X-Rate-Limit-Remaining", "" + probe.getRemainingTokens());
filterChain.doFilter(servletRequest, servletResponse);
} else {
// limit is exceeded
HttpServletResponse httpResponse = (HttpServletResponse) servletResponse;
httpResponse.setStatus(429);
httpResponse.setHeader("X-Rate-Limit-Retry-After-Seconds", "" + TimeUnit.NANOSECONDS.toSeconds(probe.getNanosToWaitForRefill()));
httpResponse.setContentType("text/plain");
httpResponse.getWriter().append("Too many requests");
}
2.1.5. Example of multiple bandwidth
Imagine that you are developing a load testing tool, in order to ensure that a testable system is able to dispatch 1000 requests per 1 minute. But you do not want to randomly kill the testable system by generating all 1000 events in one second instead of 1 minute. To solve the problem you can construct the following bucket:
static final long MAX_WAIT_NANOS = TimeUnit.HOURS.toNanos(1);
// ...
Bucket bucket = Bucket.builder()
// allows 1000 tokens per 1 minute
.addLimit(Bandwidth.simple(1000, Duration.ofMinutes(1)))
// but not often then 50 tokens per 1 second
.addLimit(Bandwidth.simple(50, Duration.ofSeconds(1)))
.build();
// ...
while (true) {
// Consume a token from the token bucket. If a token is not available this method will block until the refill adds one to the bucket.
if (bucket.asBlocking().tryConsume(1, MAX_WAIT_NANOS, BlockingStrategy.PARKING)) {
workloadExecutor.execute(new LoadTask());
};
}
2.1.6. Specifying initial amount of tokens
By default initial size of the bucket equals capacity. But sometimes, you may want to have a lesser initial size, for example for the case of cold start in order to prevent denial of service:
int initialTokens = 42;
Bandwidth limit = Bandwidth
.simple(1000, Duration.ofHours(1))
.withInitialTokens(initialTokens);
Bucket bucket = Bucket.builder()
.addLimit(limit)
.build();
2.1.7. Turning-off the refill greediness
When bandwidth is created via Bandwidth#simple
method it does refill in a greedy manner, because bandwidth tries to add the tokens to the bucket as soon as possible.
For example bandwidth with refill "10 tokens per 1 second" will add 1 token per every 100 milliseconds,
in other words, the refill will not wait 1 second to regenerate a whole bunch of 10 tokens.
If greediness is undesired then you should explicitly choose a non-greedy refill. For example, the bandwidth bellow will refill 10 tokens per 1 second instead of 1 token per 100 milliseconds:
// When refill created via "intervally" factory method then greediness is turned-off.
Refill refill = Refill.intervally(10, Duration.ofSeconds(1));
Bandwidth bandwidth = Bandwidth.classic(600, refill);
Also, it is possible to specify the time when the first refill should happen. This option can be used to configure clear interval boundary i.e. start of the second, minute, hour, day.
// imagine that wall clock is 16:20, and we need to schedule the first refill to 17:00
Instant firstRefillTime = ZonedDateTime.now()
.truncatedTo(ChronoUnit.HOURS)
.plus(1, ChronoUnit.HOURS)
.toInstant();
// see detailed explanation for useAdaptiveInitialTokens in the javadocs for 'intervallyAligned' method
boolean useAdaptiveInitialTokens = false;
Bandwidth.classic(400, Refill.intervallyAligned(400, Duration.ofHours(1), firstRefillTime, useAdaptiveInitialTokens));
2.1.8. Returning tokens back to bucket
The compensating transaction is one of the obvious use cases when you want to return tokens back to the bucket:
Bucket wallet;
...
if (wallet.tryConsume(50)) { // get 50 cents from wallet
try {
buyCocaCola();
} catch(NoCocaColaException e) {
// return money to wallet
wallet.addTokens(50);
}
}
2.1.9. Customizing time measurement - choosing nanotime time resolution
By default Bucket4j uses millisecond time resolution, it is the preferred time measurement strategy. But rarely(for example benchmarking) do you wish the nanosecond precision:
Bucket.builder().withNanosecondPrecision()
Be very careful to choose this time measurement strategy, because System.nanoTime()
produces inaccurate results,
use this strategy only if the period of bandwidth is too small that millisecond resolution will be undesired.
2.1.10. Customizing time measurement - Specify custom time measurement strategy
You can specify your custom time meter if existing milliseconds or nanotime time meters are not enough for your purposes. Imagine that you have a clock, which synchronizes its time with other machines in the current cluster, if you want to use the time provided by this clock instead of time provided by JVM then you can write something like this:
public class ClusteredTimeMeter implements TimeMeter {
@Override
public long currentTimeNanos() {
return ClusteredClock.currentTimeMillis() * 1_000_000;
}
}
Bandwidth limit = Bandwidth.simple(100, Duration.ofMinutes(1));
Bucket bucket = Bucket.builder()
.withCustomTimePrecision(new ClusteredTimeMeter())
.addLimit(limit)
.build();
2.2. Concepts
2.2.1. Bucket
Bucket
is a rate-limiter that is implemented on the top of ideas of well-known Token Bucket algorithm.
In the Bucket4j library code the Bucket
is represented by interface io.github.bucket4j.Bucket.
-
BucketConfiguration specifies an immutable collection of limitation rules that are used by the bucket during its work.
-
BucketState the place where bucket stores mutable state like the amount of currently available tokens.
A bucket can be constructed via a special builder API BucketBuilder that is available by factory method:
Bucket bucket = Bucket.builder()
.addLimit(...)
.build();
2.2.2. BucketConfiguration
BucketConfiguration
can be described as collection of limits that are used by Bucket during its job. Configuration
In the Bucket4j library code the BucketConfiguration
is represented by class io.github.bucket4j.BucketConfiguration. Configuration is immutable, there is no way to add or remove a limit to already created configuration. However, you can replace the configuration of the bucket via creating a new configuration instance and calling bucket.replaceConfiguration(newConfiguration)
.
Usually, you should not create BucketConfiguration directly(excepting the case with configuration replacement) because BucketBuilder does for you behind the scene, for rare cases when you need to create configuration directly you have to use ConfigurationBuilder
that is available by factory method:
BucketConfiguration configuration = BucketConfiguration.builder()
.addLimit(...)
.build()
Important
|
Most users configure a single limit per configuration, but it is strongly recommended to analyze whether short-timed bursts problem can affect your application and if so then think about adding more limits. |
2.2.3. Limitation/Bandwidth
Limitations that are used by bucket can be denoted in terms of bandwidths. Bandwidth is denoted by the following terms:
- Capacity
-
Capacity is the term that is directly inherited from the classic interpretation of the token-bucket algorithm, this specifies how many tokens your bucket has.
- Refill
-
Refill specifies how fast tokens can be refilled after it was consumed from a bucket.
- Initial tokens
-
Bucket4j extends the token-bucket algorithm by allowing to specify the initial amount of tokens for each bandwidth. By default, an initial amount of tokens equal to capacity and can be changed by
withInitialTokens
method:Bandwidth bandwidth = Bandwidth.simple(42, Duration.ofMinutes(1)) .withInitialTokens(13);
- ID
-
The identifier is the optional attribute that is null by default. You may prefer to assign identifiers for bandwidths if you use on-the-fly configuration replacement and your buckets have more than one bandwidth per bucket, otherwise, it is better to avoid using identifiers to preserve memory. The Identifier for bandwidth can be specified by
withId
method:BucketConfiguration configuration = BucketConfiguration.builder() .addLimit(Bandwidth.simple(1000, Duration.ofMinutes(1)).withId("business-limit")) .addLimit(Bandwidth.simple(100, Duration.ofSeconds(1)).withId("burst-protection")) .build();
NoteIdentifiers are critical for on-the-fly configuration replacement functionality because during replacement it needs to decide how correctly propagate information about already consumed tokens from the state before config replacement to the state after replacement. This is not a trivial task especially when the number of limits is changing.
2.2.4. Refill
Specifies the speed of tokens regeneration.
- Greedy
-
This type of refill greedily regenerates tokens manner, it tries to add the tokens to the bucket as soon as possible. For example refill "10 tokens per 1 second" adds 1 token per every 100 milliseconds, in other words, the refill will not wait 1 second to regenerate a whole bunch of 10 tokens. The three refills below do refill of tokens with the same speed:
Refill.greedy(600, Duration.ofMinutes(1)); Refill.greedy(10, Duration.ofSeconds(1)); Refill.greedy(1, Duration.ofMillis(100));
Greedy
is the default type of refill that is used when you createsimple
bandwidth// the two lines of code bellow are fully equivalent Bandwidth.simple(100, Duration.ofMinutes(1)) Bandwidth.classic(100, Refill.greedy(100, Duration.ofMinutes(1)))
- Interval
-
This type of refill regenerates tokens in an interval manner. "Interval" in opposite to "greedy" will wait until the whole period will be elapsed before regenerating the whole amount of tokens.
Example:// generates 100 tokens each minute Refill.intervally(100, Duration.ofMinutes(1));
- IntervallyAligned
-
This type of refill regenerates that does refill of tokens in an interval manner. Interval" in opposite to "greedy" will wait until the whole period will be elapsed before regenerating the whole amount of tokens. In addition to Interval it is possible to specify the time when the first refill should happen. This type can be used to configure clear interval boundary i.e. start of the second, minute, hour, day. To get more details to read javadocs for
Refill#intervallyAligned
method.Example:// imagine that wall clock is 16:20, the first refill will happen at 17:00 // first refill will happen in the beginning of next hour Instant firstRefillTime = ZonedDateTime.now() .truncatedTo(ChronoUnit.HOURS) .plus(1, ChronoUnit.HOURS) .toInstant(); Bandwidth.classic(400, Refill.intervallyAligned(400, Duration.ofHours(1), firstRefillTime, true));
2.2.5. BucketState
BucketState is the place where bucket stores own mutable state like:
-
Amount of currently available tokens.
-
Timestamp when the last refill was happen.
BucketState
is represented by interface io.github.bucket4j.BucketState. Usually you never interact with this interface, excepting the cases when you want to get access to low-level diagnostic API that is described in
2.2.6. BucketBuilder
It was explicitly decided by library authors to not provide for end-users to construct a library entity via direct constructors.
-
To be able in the future to change internal implementations without breaking backward compatibility.
-
To to provide
Fluent Builder API
that in our minds is a good modern library design pattern.
LocalBucketBuilder
is a fluent builder that is specialized to construct the local buckets, where a local bucket is a bucket that holds an internal state just in memory and does not provide clustering functionality. Bellow an example of LocalBucketBuilder usage:
Bucket bucket = Bucket.builder()
.addLimit(Bandwidth.simple())
.withNanosecondPrecision()
.withSynchronizationStrategy(SynchronizationStrategy.LOCK_FREE)
.build()
2.3. Technical limitations
To provide the best precision, Bucket4j uses integer arithmetic as much as possible, so any internal calculation is limited by bound Long.MAX_VALUE
. The library introduces several limits that are described further, to be sure that calculations will never exceed the bound.
2.3.1. Maximum refill rate
The maximum refill rate is limited by 1 token/ 1 nanosecond
. Following examples of API usage will raise exceptions
Bandwidth.simple(2, Duration.ofNanos(1));
Bandwidth.simple(1001, Duration.ofNanos(1000));
Bandwidth.simple(1_000_001, Duration.ofMillis(1));
2.3.2. Limitation for refill period
Bucket4j works with time intervals as the 64-bit number of nanoseconds. So maximum refill period that is possible will be:
Duration.ofNanos(Long.MAX_VALUE);
Any attempt to specify a period longer than the limit above will fail with an exception. For example, the code below will failed
Bandwidth.simple(42, Duration.ofMinutes(153722867280912930));
Exception in thread "main" java.lang.ArithmeticException: long overflow
at java.lang.Math.multiplyExact(Math.java:892)
at java.time.Duration.toNanos(Duration.java:1186)
at io.github.bucket4j.Refill.<init>(Refill.java:48)
at io.github.bucket4j.Refill.greedy(Refill.java:100)
at io.github.bucket4j.Bandwidth.simple(Bandwidth.java:102)
2.4. Basic API Reference
2.4.1. io.github.bucket4j.Bucket
tryConsume
/**
* Tries to consume a specified number of tokens from this bucket.
*
* @param numTokens The number of tokens to consume from the bucket, must be a positive number.
*
* @return {@code true} if the tokens were consumed, {@code false} otherwise.
*/
boolean tryConsume(long numTokens);
consumeIgnoringRateLimits
/**
* Consumes {@code tokens} from bucket ignoring all limits.
* As a result of this operation amount of tokens in the bucket could become negative.
*
* There are two possible reasons to use this method:
* <ul>
* <li>An operation with high priority should be executed independently of rate limits, but it should take effect to subsequent operation with bucket.</li>
* <li>You want to apply custom blocking strategy instead of default which applied on {@code asScheduler().consume(tokens)} </li>
* </ul>
*
* @param tokens amount of tokens that should be consumed from a bucket.
*
* @return
* the amount of rate limit violation in nanoseconds is calculated in the following way:
* <ul>
* <li><tt>zero</tt> if rate limit was not violated. For example bucket had 5 tokens before invocation of {@code consumeIgnoringRateLimits(2)},
* after invocation there are 3 tokens remain in the bucket, since limits were not violated <tt>zero</tt> returned as result.</li>
* <li>Positive value which describes the amount of rate limit violation in nanoseconds.
* For example, a bucket with a limit of 10 tokens per 1 second, currently has the 2 tokens available, last refill happen 100 milliseconds ago, and {@code consumeIgnoringRateLimits(6)} called.
* <tt>300_000_000</tt> will be returned as result and available tokens in the bucket will became <tt>-3</tt>, and any variation of {@code tryConsume...} will not be successful for 400 milliseconds(time required to refill amount of available tokens until 1).
* </li>
* </ul>
*/
long consumeIgnoringRateLimits(long tokens);
tryConsumeAndReturnRemaining
/**
* Tries to consume a specified number of tokens from this bucket.
*
* @param numTokens The number of tokens to consume from the bucket, must be a positive number.
*
* @return {@link ConsumptionProbe} which describes both the result of consumption and tokens remaining in the bucket after consumption.
*/
ConsumptionProbe tryConsumeAndReturnRemaining(long numTokens);
estimateAbilityToConsume
/**
* Estimates ability to consume a specified number of tokens.
*
* @param numTokens The number of tokens to consume, must be a positive number.
*
* @return {@link EstimationProbe} which describes the ability to consume.
*/
EstimationProbe estimateAbilityToConsume(long numTokens);
tryConsumeAsMuchAsPossible
/**
* Tries to consume as many tokens from this bucket as available at the moment of invocation.
*
* @return number of tokens which have been consumed, or zero if nothing was consumed.
*/
long tryConsumeAsMuchAsPossible();
/**
* Tries to consume as much tokens from the bucket as available in the bucket at the moment of invocation,
* but tokens which should be consumed are limited by {@code limit}.
*
* @param limit a maximum number of tokens to consume, should be positive.
*
* @return number of tokens which has been consumed, or zero if nothing was consumed.
*/
long tryConsumeAsMuchAsPossible(long limit);
addTokens
/**
* Add <tt>tokensToAdd</tt> to bucket.
* Resulted count of tokens are calculated by following formula:
* <pre>newTokens = Math.min(capacity, currentTokens + tokensToAdd)</pre>
* In other words resulted number of tokens never exceeds capacity independent of <tt>tokensToAdd</tt>.
*
* <h3>Example of usage</h3>
* The "compensating transaction" is one of the obvious use case, when any piece of code consumed tokens from a bucket, tried to do something, and failed, the "addTokens" will be helpful to return tokens back to the bucket:
* <pre>{@code
* Bucket wallet;
* ...
* if(wallet.tryConsume(50)) {// get 50 cents from wallet
* try {
* buyCocaCola();
* } catch(NoCocaColaException e) {
* // return money to wallet
* wallet.addTokens(50);
* }
* };
* }</pre>
*
* @param tokensToAdd number of tokens to add
*/
void addTokens(long tokensToAdd);
forceAddTokens
/**
* Add <tt>tokensToAdd</tt> to bucket. In opposite to {@link #addTokens(long)} usage of this method can lead to overflow bucket capacity.
*
* <h3>Example of usage</h3>
* The "compensating transaction" is one of the obvious use case, when any piece of code consumed tokens from a bucket, tried to do something, and failed, the "addTokens" will be helpful to return tokens back to the bucket:
* <pre>{@code
* Bucket wallet;
* ...
* if(wallet.tryConsume(50)) {// get 50 cents from wallet
* try {
* buyCocaCola();
* } catch(NoCocaColaException e) {
* // return money to wallet
* wallet.addTokens(50);
* }
* };
* }</pre>
*
* @param tokensToAdd number of tokens to add
*/
void forceAddTokens(long tokensToAdd);
getAvailableTokens
/**
* Returns the amount of available tokens in this bucket.
* <p>
* Typically you should avoid using this method for, because available tokens can be changed by concurrent transactions in the case of a multithreaded/multi-process environment.
*
* @return amount of available tokens
*/
long getAvailableTokens();
builder
/**
* Creates the new builder of in-memory buckets.
*
* @return new instance of {@link LocalBucketBuilder}
*/
static LocalBucketBuilder builder() {
return new LocalBucketBuilder();
}
replaceConfiguration
/**
* Replaces configuration of this bucket.
*
* <p>
* The first hard problem of configuration replacement is making decisions on how to propagate available tokens from the bucket with the previous configuration to the bucket with a new configuration.
* If you don't care about the previous bucket state then use {@link TokensInheritanceStrategy#RESET}.
* But it becomes a tricky problem when we expect that previous consumption(that has not been compensated by refill yet) should take effect to the bucket with a new configuration.
* In this case you need to make a choice between {@link TokensInheritanceStrategy#PROPORTIONALLY} and {@link TokensInheritanceStrategy#AS_IS}, read the documentation about both with strong attention.
*
* <p> There is another problem when you are choosing {@link TokensInheritanceStrategy#PROPORTIONALLY} and {@link TokensInheritanceStrategy#AS_IS} and the bucket has more than one bandwidth.
* For example how does replaceConfiguration implementation bind bandwidths to each other in the following example?
* <pre>
* <code>
* Bucket bucket = Bucket.builder()
* .addLimit(Bandwidth.simple(10, Duration.ofSeconds(1)))
* .addLimit(Bandwidth.simple(10000, Duration.ofHours(1)))
* .build();
* ...
* BucketConfiguration newConfiguration = BucketConfiguration.builder()
* .addLimit(Bandwidth.simple(5000, Duration.ofHours(1)))
* .addLimit(Bandwidth.simple(100, Duration.ofSeconds(10)))
* .build();
* bucket.replaceConfiguration(newConfiguration, TokensInheritanceStrategy.AS_IS);
* </code>
* </pre>
* It is obvious that a simple strategy - copying tokens by bandwidth index will not work well in this case, because it highly depends from order.
* Instead of inventing the backward magic Bucket4j provides to you the ability to deap controll of this process by specifying identifiers for bandwidth,
* so in case of multiple bandwidth configuratoin replacement code can copy available tokens by bandwidth ID. So it is better to rewrite the code above as following:
* <pre>
* <code>
* Bucket bucket = Bucket.builder()
* .addLimit(Bandwidth.simple(10, Duration.ofSeconds(1)).withId("technical-limit"))
* .addLimit(Bandwidth.simple(10000, Duration.ofHours(1)).withId("business-limit"))
* .build();
* ...
* BucketConfiguration newConfiguration = BucketConfiguration.builder()
* .addLimit(Bandwidth.simple(5000, Duration.ofHours(1)).withId("business-limit"))
* .addLimit(Bandwidth.simple(100, Duration.ofSeconds(10)).withId("technical-limit"))
* .build();
* bucket.replaceConfiguration(newConfiguration, TokensInheritanceStrategy.AS_IS);
* </code>
* </pre>
*
*
* <p>
* There are the following rules for bandwidth identifiers:
* <ul>
* <li>
* By default bandwidth has <b>null</b> identifier.
* </li>
* <li>
* null value of identifier equals to another null value if and only if there is only one bandwidth with null identifier.
* </li>
* <li>
* If an identifier for bandwidth is specified then it must be unique in the bucket. Bucket does not allow to create of several bandwidths with the same ID.
* </li>
* <li>
* {@link TokensInheritanceStrategy#RESET} strategy will be applied for tokens migration during config replacement for bandwidth which has no bound bandwidth with the same ID in the previous configuration,
* independently of the strategy that was requested.
* </li>
* </ul>
*
* @param newConfiguration the new configuration
* @param tokensInheritanceStrategy specifies the rules for inheritance of available tokens
*/
void replaceConfiguration(BucketConfiguration newConfiguration, TokensInheritanceStrategy tokensInheritanceStrategy);
See configuration replacement section for more details.
asBlocking
/**
* Returns the blocking API for this bucket, that provides operations which are able to block caller thread in case of lack of tokens.
*
* @return the blocking API for this bucket.
*
* @see BlockingBucket
*/
BlockingBucket asBlocking();
See BlockingBucket section for more details.
asScheduler
/**
* Returns the scheduling API for this bucket, that provides operations which can delay user operation via {@link java.util.concurrent.ScheduledExecutorService} in case of lack of tokens.
*
* @return the scheduling API for this bucket.
*
* @see SchedulingBucket
*/
SchedulingBucket asScheduler();
See SchedulingBucket section for more details.
asVerbose
/**
* Returns the verbose API for this bucket.
*
* @return the verbose API for this bucket.
*/
VerboseBucket asVerbose();
See Verbose API section for more details.
toListenable
/**
* Returns a new copy of this bucket instance decorated by {@code listener}.
* The created bucket will share the same tokens with the source bucket and vice versa.
*
* See javadocs for {@link BucketListener} in order to understand the semantics of listener.
*
* @param listener the listener of bucket events.
*
* @return new bucket instance decorated by {@code listener}
*/
Bucket toListenable(BucketListener listener);
See Listening for bucket events section for more details.
2.4.2. io.github.bucket4j.BlockingBucket
tryConsume
/**
* Tries to consume a specified number of tokens from the bucket.
*
* <p>
* The algorithm is following:
* <ul>
* <li>If bucket has enough tokens, then tokens consumed and <tt>true</tt> returned immediately.</li>
* <li>If bucket has no enough tokens,
* and required amount of tokens can not be refilled,
* even after waiting of <code>maxWaitTimeNanos</code> nanoseconds,
* then consumes nothing and returns <tt>false</tt> immediately.
* </li>
* <li>
* If bucket has no enough tokens,
* but deficit can be closed in period of time less than <code>maxWaitTimeNanos</code> nanoseconds,
* then tokens consumed(reserved in fair manner) from bucket and current thread blocked for a time required to close deficit,
* after unblocking method returns <tt>true</tt>.
*
* <p>
* <strong>Note:</strong> If InterruptedException happen when thread was blocked
* then tokens will be not returned back to bucket,
* but you can use {@link Bucket#addTokens(long)} to return tokens back.
* </li>
* </ul>
*
* @param numTokens The number of tokens to consume from the bucket.
* @param maxWaitTimeNanos limit of time(in nanoseconds) which thread can wait.
* @param blockingStrategy specifies the way to block the current thread to the amount of time required to refill a missed number of tokens in the bucket
*
* @return true if {@code numTokens} has been consumed or false when {@code numTokens} has not been consumed
*
* @throws InterruptedException in case of current thread has been interrupted during the waiting
*/
boolean tryConsume(long numTokens, long maxWaitTimeNanos, BlockingStrategy blockingStrategy) throws InterruptedException;
/**
* This is just overloaded equivalent of {@link #tryConsume(long, long, BlockingStrategy)}
*
* @see #tryConsume(long, long, BlockingStrategy)
*
* @param numTokens The number of tokens to consume from the bucket.
* @param maxWait limit of time which thread can wait.
* @param blockingStrategy specifies the way to block the current thread to the amount of time required to refill a missed number of tokens in the bucket
*
* @return true if {@code numTokens} has been consumed or false when {@code numTokens} has not been consumed
*
* @throws InterruptedException in case of current thread has been interrupted during the waiting
*/
default boolean tryConsume(long numTokens, Duration maxWait, BlockingStrategy blockingStrategy) throws InterruptedException {
return tryConsume(numTokens, maxWait.toNanos(), blockingStrategy);
}
/**
* This is just overloaded equivalent of {@link #tryConsume(long, long, BlockingStrategy)}
*
* @see #tryConsume(long, long, BlockingStrategy)
*
* @param numTokens The number of tokens to consume from the bucket.
* @param maxWaitTimeNanos limit of time(in nanoseconds) which thread can wait.
*
* @return true if {@code numTokens} has been consumed or false when {@code numTokens} has not been consumed
*
* @throws InterruptedException in case of current thread has been interrupted during the waiting
*/
default boolean tryConsume(long numTokens, long maxWaitTimeNanos) throws InterruptedException {
return tryConsume(numTokens, maxWaitTimeNanos, BlockingStrategy.PARKING);
}
/**
* This is just overloaded equivalent of {@link #tryConsume(long, long, BlockingStrategy)}
*
* @see #tryConsume(long, long, BlockingStrategy)
*
* @param numTokens The number of tokens to consume from the bucket.
* @param maxWait limit of time which thread can wait.
*
* @return true if {@code numTokens} has been consumed or false when {@code numTokens} has not been consumed
*
* @throws InterruptedException in case of current thread has been interrupted during the waiting
*/
default boolean tryConsume(long numTokens, Duration maxWait) throws InterruptedException {
return tryConsume(numTokens, maxWait.toNanos(), BlockingStrategy.PARKING);
}
tryConsumeUninterruptibly
/**
* Has same semantic with {@link #tryConsume(long, long, BlockingStrategy)} but ignores interrupts(just restores interruption flag on exit).
*
* @param numTokens The number of tokens to consume from the bucket.
* @param maxWaitTimeNanos limit of time(in nanoseconds) which thread can wait.
* @param blockingStrategy specifies the way to block the current thread to the amount of time required to refill missed number of tokens in the bucket
*
* @return true if {@code numTokens} has been consumed or false when {@code numTokens} has not been consumed
*
* @see #tryConsume(long, long, BlockingStrategy)
*/
boolean tryConsumeUninterruptibly(long numTokens, long maxWaitTimeNanos, UninterruptibleBlockingStrategy blockingStrategy);
/**
* This is just overloaded equivalent of {@link #tryConsumeUninterruptibly(long, long, UninterruptibleBlockingStrategy)}
*
* @param numTokens The number of tokens to consume from the bucket.
* @param maxWait limit of time which thread can wait.
* @param blockingStrategy specifies the way to block the current thread to the amount of time required to refill a missed number of tokens in the bucket
*
* @return true if {@code numTokens} has been consumed or false when {@code numTokens} has not been consumed
*
* @see #tryConsumeUninterruptibly(long, long, UninterruptibleBlockingStrategy)
*/
default boolean tryConsumeUninterruptibly(long numTokens, Duration maxWait, UninterruptibleBlockingStrategy blockingStrategy) {
return tryConsumeUninterruptibly(numTokens, maxWait.toNanos(), blockingStrategy);
}
/**
* This is just overloaded equivalent of {@link #tryConsumeUninterruptibly(long, long, UninterruptibleBlockingStrategy)}
*
* @param numTokens The number of tokens to consume from the bucket.
* @param maxWaitTimeNanos limit of time(in nanoseconds) which thread can wait.
*
* @return true if {@code numTokens} has been consumed or false when {@code numTokens} has not been consumed
*
* @see #tryConsumeUninterruptibly(long, long, UninterruptibleBlockingStrategy)
*/
default boolean tryConsumeUninterruptibly(long numTokens, long maxWaitTimeNanos) {
return tryConsumeUninterruptibly(numTokens, maxWaitTimeNanos, UninterruptibleBlockingStrategy.PARKING);
}
/**
* This is just overloaded equivalent of {@link #tryConsumeUninterruptibly(long, long, UninterruptibleBlockingStrategy)}
*
* @param numTokens The number of tokens to consume from the bucket.
* @param maxWait limit of time which thread can wait.
*
* @return true if {@code numTokens} has been consumed or false when {@code numTokens} has not been consumed
*
* @see #tryConsumeUninterruptibly(long, long, UninterruptibleBlockingStrategy)
*/
default boolean tryConsumeUninterruptibly(long numTokens, Duration maxWait) {
return tryConsumeUninterruptibly(numTokens, maxWait.toNanos(), UninterruptibleBlockingStrategy.PARKING);
}
consume
/**
* Consumes a specified number of tokens from the bucket.
*
* <p>
* The algorithm is following:
* <ul>
* <li>If bucket has enough tokens, then tokens consumed and method returns immediately.</li>
* <li>
* If bucket has no enough tokens, then required amount of tokens will be reserved for future consumption
* and current thread will be blocked for a time required to close deficit.
* </li>
* <li>
* <strong>Note:</strong> If InterruptedException happen when thread was blocked
* then tokens will be not returned back to bucket,
* but you can use {@link Bucket#addTokens(long)} to returned tokens back.
* </li>
* </ul>
*
* @param numTokens The number of tokens to consume from the bucket.
* @param blockingStrategy specifies the way to block the current thread to the amount of time required to refill a missed number of tokens in the bucket
*
*
* @throws InterruptedException in case of current thread has been interrupted during the waiting
*/
void consume(long numTokens, BlockingStrategy blockingStrategy) throws InterruptedException;
/**
* This is just overloaded equivalent of {@link #consume(long, BlockingStrategy)}
*
* @param numTokens The number of tokens to consume from the bucket.
*
* @throws InterruptedException in case of current thread has been interrupted during the waiting
*
* @see #consume(long, BlockingStrategy)
*/
default void consume(long numTokens) throws InterruptedException {
consume(numTokens, BlockingStrategy.PARKING);
}
consumeUninterruptibly
/**
* Has same semantic with {@link #consume(long, BlockingStrategy)} but ignores interrupts(just restores interruption flag on exit).
*
* @param numTokens The number of tokens to consume from the bucket.
* @param blockingStrategy specifies the way to block the current thread to the amount of time required to refill a missed number of tokens in the bucket
*
* @see #consume(long, BlockingStrategy)
*/
void consumeUninterruptibly(long numTokens, UninterruptibleBlockingStrategy blockingStrategy);
/**
* This is just overloaded equivalent of {@link #consumeUninterruptibly(long, UninterruptibleBlockingStrategy)}
*
* @param numTokens The number of tokens to consume from the bucket.
*
* @see #consumeUninterruptibly(long, UninterruptibleBlockingStrategy)
*/
default void consumeUninterruptibly(long numTokens) {
consumeUninterruptibly(numTokens, UninterruptibleBlockingStrategy.PARKING);
}
2.4.3. io.github.bucket4j.SchedulingBucket
tryConsume
/**
* Tries to consume the specified number of tokens from the bucket.
*
* <p>
* <strong>The algorithm for all type of buckets is following:</strong>
* <ul>
* <li>Implementation issues asynchronous request to back-end behind the bucket(for local bucket it is just a synchronous call) in a way which specific for each particular back-end.</li>
* <li>Then uncompleted future returned to the caller.</li>
* <li>If back-end provides signal(through callback) that asynchronous request failed, then future completed exceptionally.</li>
* <li>When back-end provides signal(through callback) that request is done(for local bucket response got immediately), then following post-processing rules will be applied:
* <ul>
* <li>
* If tokens were consumed then the future immediately completed by <tt>true</tt>.
* </li>
* <li>
* If tokens were not consumed because were not enough tokens in the bucket and <tt>maxWaitNanos</tt> nanoseconds is not enough time to refill the deficit,
* then the future is immediately completed by <tt>false</tt>.
* </li>
* <li>
* If tokens were reserved(effectively consumed) then <tt>task</tt> to delayed completion will be scheduled to the <tt>scheduler</tt> via {@link ScheduledExecutorService#schedule(Runnable, long, TimeUnit)},
* when delay equals to the time required to refill the deficit of tokens. After the scheduler executes the task the future completed by <tt>true</tt>.
* </li>
* </ul>
* </li>
* </ul>
* It is strongly not recommended to do any heavy work in a thread which completes the future,
* because typically this will be a back-end thread which handles NIO selectors,
* blocking this thread will take negative performance effect to back-end throughput,
* so you always should resume control flow in another executor via methods like {@link CompletableFuture#thenApplyAsync(Function, Executor)}.
*
* @param numTokens The number of tokens to consume from the bucket.
* @param maxWaitNanos limit of time(in nanoseconds) which thread can wait.
* @param scheduler used to delayed future completion
*/
CompletableFuture<Boolean> tryConsume(long numTokens, long maxWaitNanos, ScheduledExecutorService scheduler);
/**
* This is just overloaded equivalent of {@link #tryConsume(long, long, ScheduledExecutorService)}
*
* @param numTokens The number of tokens to consume from the bucket.
* @param maxWait limit of time which thread can wait.
* @param scheduler used to delayed future completion
*
* @see #tryConsume(long, long, ScheduledExecutorService)
*/
default CompletableFuture<Boolean> tryConsume(long numTokens, Duration maxWait, ScheduledExecutorService scheduler) {
return tryConsume(numTokens, maxWait.toNanos(), scheduler);
}
consume
/**
* Consumes the specified number of tokens from the bucket.
*
* <p>
* <strong>The algorithm for all type of buckets is following:</strong>
* <ul>
* <li>Implementation issues asynchronous request to back-end behind the bucket(for local bucket it is just a synchronous call) in a way which specific for each particular back-end.</li>
* <li>Then uncompleted future returned to the caller.</li>
* <li>If back-end provides signal(through callback) that asynchronous request failed, then future completed exceptionally.</li>
* <li>When back-end provides signal(through callback) that request is done(for local bucket response got immediately), then following post-processing rules will be applied:
* <ul>
* <li>
* If tokens were consumed then the future was immediately completed.
* </li>
* <li>
* Else tokens reserved(effectively consumed) and <tt>task</tt> to delayed completion will be scheduled to the <tt>scheduler</tt> via {@link ScheduledExecutorService#schedule(Runnable, long, TimeUnit)},
* when delay equals the time required to refill the deficit of tokens. After the scheduler executes the task the future is completed.
* </li>
* </ul>
* </li>
* </ul>
* It is strongly not recommended to do any heavy work in a thread that completes the future,
* because typically this will be a back-end thread which handles NIO selectors,
* blocking this thread will take negative performance effect to back-end throughput,
* so you always should resume control flow in another executor via methods like {@link CompletableFuture#thenApplyAsync(Function, Executor)}.
*
* @param numTokens The number of tokens to consume from the bucket.
* @param scheduler used to delayed future completion
*
*/
CompletableFuture<Void> consume(long numTokens, ScheduledExecutorService scheduler);
2.5. Generic production checklist
The considerations described below apply to each solution based on the token-bucket or leaky-bucket algorithm. You need to understand, agree, and configure the following points:
2.5.1. Be wary of long periods
When you are planning to use any solution based on token-bucket for throttling incoming requests, you need to pay close attention to the throttling time window.
-
Given a bucket with a limit of 10000 tokens/ per 1 hour per user.
-
A malicious attacker may send 9999 requests in a very short period, for example within 10 seconds. This would correspond to 100 requests per second which could seriously impact your system.
-
A skilled attacker could stop at 9999 requests per hour, and repeat every hour, which would make this attack impossible to detect (because the limit would not be reached).
To protect from this kind of attack, you should specify multiple limits like bellow
Bucket bucket = Bucket.builder()
.addLimit(Bandwidth.simple(10000, Duration.ofSeconds(3_600))
.addLimit(Bandwidth.simple(20, Duration.ofSeconds(1)) // attacker is unable to achieve 1000RPS and crash service in short time
.build();
The number of limits specified per bucket does not impact the performance.
2.5.2. Be wary of short-timed bursts
The token bucket is an efficient algorithm with a low and fixed memory footprint, independently of the incoming request rate (it can be millions per second) the bucket consumes no more than 40 bytes(five longs). But an efficient memory footprint has its own cost - bandwidth limitation is only satisfied over a long period. In other words, you cannot avoid short-timed bursts.
-
Given a bucket with a limit of 100 tokens/min. We start with a full bucket, i.e. with 100 tokens.
-
At
T1
100 requests are made and thus the bucket becomes empty. -
At
T1+1min
the bucket is full again because tokens are fully regenerated and we can immediately consume 100 tokens. -
This means that between
T1
andT1+1min
we have consumed 200 tokens. Over a long time, there will be no more than 100 requests per min, but as shown above, it is possible to burst at twice the limit here at 100 tokens per min.
-
Do not use Bucket4j or any other solution implemented on top of token-bucket algorithms, because token-bucket is specially designed for network traffic management devices for which short-living traffic spike is a regular case, trying to avoid spike at all contradicts with the nature of token-bucket.
-
Since the value of burst always equals capacity, try to reduce the capacity and speed of refill. For example, if you have strong requirements
100tokens/60seconds
then configure bucket ascapacity=50tokens refill=50tokens/60seconds
. It is worth mentioning that this way leads to the following drawbacks: — In one time you are not allowed to consume several tokens greater than capacity, according to the example above - before capacity reducing you were able to consume 100 tokens in a single request, after reducing you can consume 50 tokens in one request at max. — Reducing the speed of refill leads to underconsumption on long term periods, it is obvious that with refill50tokens/60seconds
you will be able to consume 3050 tokens for 1 hour, instead of 6100(as was prior refill reducing). — As a summary of the two drawbacks above, we can say that you will pay via underconsumption for eliminating the risk of overconsumption.
3. Distributed facilities
3.1. JCache integration
Bucket4j
supports any GRID solution which compatible with JCache API (JSR 107) specification.
Note
|
Do not forget to read Distributed usage checklist before using the Bucket4j over the JCache cluster. |
To use the JCache extension you also need to add the following dependency:
<dependency>
<groupId>com.bucket4j</groupId>
<artifactId>bucket4j-jcache</artifactId>
<version>8.0.1</version>
</dependency>
Note
|
see Java compatibility matrix if you need for build that is compatible with Java 8 |
JCache expects javax.cache.cache-api to be a provided dependency. Do not forget to add the following dependency:
<dependency>
<groupId>javax.cache</groupId>
<artifactId>cache-api</artifactId>
<version>${jcache.version}</version>
</dependency>
3.1.1. Example 1 - limiting access to HTTP server by IP address
Imagine that you develop any Servlet-based WEB application and want to limit access per IP basis. You want to use the same limits for each IP - 30 requests per minute.
ServletFilter would be the obvious place to check limits:
public class IpThrottlingFilter implements javax.servlet.Filter {
private static final BucketConfiguration configuration = BucketConfiguration.builder()
.addLimit(Bandwidth.simple(30, Duration.ofMinutes(1)))
.build();
// cache for storing token buckets, where IP is key.
@Inject
private javax.cache.Cache<String, byte[]> cache;
private ProxyManager<String> buckets;
@Override
public void init(FilterConfig filterConfig) throws ServletException {
// init bucket registry
buckets = new JCacheProxyManager<>(cache);
}
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
String ip = IpHelper.getIpFromRequest(httpRequest);
// acquire cheap proxy to the bucket
Bucket bucket = proxyManager.builder().build(key, configuration);
// tryConsume returns false immediately if no tokens available with the bucket
if (bucket.tryConsume(1)) {
// the limit is not exceeded
filterChain.doFilter(servletRequest, servletResponse);
} else {
// limit is exceeded
HttpServletResponse httpResponse = (HttpServletResponse) servletResponse;
httpResponse.setContentType("text/plain");
httpResponse.setStatus(429);
httpResponse.getWriter().append("Too many requests");
}
}
}
3.1.2. Example 2 - limiting access to service by contract agreements
Imagine that you provide paid language translation service via HTTP. Each user has a unique agreement that differs from the other. Details of each agreement are stored in a relational database and take significant time to fetch(for example 100ms). The example above will not work fine in this case, because time to create/fetch the configuration of the bucket from the database will be 100 times slower than limit-checking itself. Bucket4j solves this problem via lazy configuration suppliers which are called if and only if the bucket was not yet stored in the grid, thus it is possible to implement a solution that will read the agreement from the database once per user.
public class IpThrottlingFilter implements javax.servlet.Filter {
// service to provide per user limits
@Inject
private LimitProvider limitProvider;
// cache for storing token buckets, where IP is key.
@Inject
private javax.cache.Cache<String, byte[]> cache;
private ProxyManager<String> buckets;
@Override
public void init(FilterConfig filterConfig) throws ServletException {
// init bucket registry
buckets = new JCacheProxyManager<>(cache);
}
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
String userId = AutentificationHelper.getUserIdFromRequest(httpRequest);
// prepare configuration supplier which will be called(on the first interaction with proxy) if the bucket was not saved yet previously.
Supplier<BucketConfiguration> configurationLazySupplier = getConfigSupplierForUser(userId);
// acquire cheap proxy to the bucket
Bucket bucket = proxyManager.builder().build(key, configurationLazySupplier);
// tryConsume returns false immediately if no tokens available with the bucket
if (bucket.tryConsume(1)) {
// the limit is not exceeded
filterChain.doFilter(servletRequest, servletResponse);
} else {
// limit is exceeded
HttpServletResponse httpResponse = (HttpServletResponse) servletResponse;
httpResponse.setContentType("text/plain");
httpResponse.setStatus(429);
httpResponse.getWriter().append("Too many requests");
}
}
private Supplier<BucketConfiguration> getConfigSupplierForUser(String userId) {
return () -> {
long translationsPerDay = limitProvider.readPerDayLimitFromAgreementsDatabase(userId);
return BucketConfiguratiion.builder()
.addLimit(Bandwidth.simple(translationsPerDay, Duration.ofDays(1)))
.build();
};
}
}
3.1.3. Why JCache specification is not enough in modern stacks and since 3.0 were introduced the dedicated modules for Infinispan, Hazelcast, Coherence and Ignite?
Asynchronous processing is very important for high-throughput applications, but JCache specification does not specify asynchronous API, because two early attempts to bring this kind of functionality at spec level 307, 312 were failed in absence of consensus.
Also, implementing the asynchronous support for any other JCache provider outside of the list above should be an easy exercise, so feel free to return back the pull request addressed to cover your favorite JCache provider.
3.1.4. Verification of compatibility with a particular JCache provider is your responsibility
Important
|
Keep in mind that there are many non-certified implementations of JCache specifications on the market. Many of them want to increase their popularity by declaring support for the JCache API, but often only the API is supported and the semantic of JCache is totally ignored. Usage Bucket4j with this kind of library should be completely avoided. |
Bucket4j is only compatible with implementations that obey the JCache specification rules(especially related to EntryProcessor execution). Oracle Coherence, Apache Ignite, Hazelcast are good examples of safe implementations of JCache.
Important
|
Because it is impossible to test all possible JCache providers, you need to test your provider by yourself. |
Just run this code in order to be sure that your implementation of JCache provides good isolation for EntryProcessors
import javax.cache.Cache;
import javax.cache.processor.EntryProcessor;
import java.util.concurrent.CountDownLatch;
import java.io.Serializable;
public class CompatibilityTest {
final Cache<String, Integer> cache;
public CompatibilityTest(Cache<String, Integer> cache) {
this.cache = cache;
}
public void test() throws InterruptedException {
String key = "42";
int threads = 4;
int iterations = 1000;
cache.put(key, 0);
CountDownLatch latch = new CountDownLatch(threads);
for (int i = 0; i < threads; i++) {
new Thread(() -> {
try {
for (int j = 0; j < iterations; j++) {
EntryProcessor<String, Integer, Void> processor = (EntryProcessor<String, Integer, Void> & Serializable) (mutableEntry, objects) -> {
int value = mutableEntry.getValue();
mutableEntry.setValue(value + 1);
return null;
};
cache.invoke(key, processor);
}
} finally {
latch.countDown();
}
}).start();
}
latch.await();
int value = cache.get(key);
if (value == threads * iterations) {
System.out.println("Implementation which you use is compatible with Bucket4j");
} else {
String msg = "Implementation which you use is not compatible with Bucket4j";
msg += ", " + (threads * iterations - value) + " writes are missed";
throw new IllegalStateException(msg);
}
}
}
The check does 4000 increments of integer in parallel and verifies that no one update has been missed. If the check passed then your JCache provider is compatible with Bucket4j, the throttling will work fine in a distributed and concurrent environment. If the check is not passed, then reach out to the particular JCache provider team and consult why its implementation misses the writes.
3.2. Hazelcast integration
3.2.1. Dependencies
To use Bucket4j extension for Hazelcast with Hazelcast 4.x
you need to add the following dependency:
<dependency>
<groupId>com.bucket4j</groupId>
<artifactId>bucket4j-hazelcast</artifactId>
<version>8.0.1</version>
</dependency>
If you are using a legacy version of Hazelcast 3.x
then you need to add the following dependency:
<dependency>
<groupId>com.bucket4j</groupId>
<artifactId>bucket4j-hazelcast-3</artifactId>
<version>8.0.1</version>
</dependency>
Note
|
see Java compatibility matrix if you need for build that is compatible with Java 8 |
3.2.2. General compatibility matrix principles:
-
Bucket4j authors do not perform continuous monitoring of new Hazelcast releases. So, there is can be a case when there is no one version of Bucket4j which is compatible with the newly released Hazelcast, just log an issue to bug tracker in this case, adding support to new version of Hazelcast is usually an easy exercise.
-
Integrations with legacy versions of Hazelcast are not removed without a clear reason. Hence You are in safety, even you are working in a big enterprise company that does not update its infrastructure frequently because You still get new Bucket4j’s features even for legacy Hazelcast’s releases.
3.2.3. Example of Bucket instantiation
IMap<K, byte[]> map = ...;
private static final HazelcastProxyManager<K> proxyManager = new HazelcastProxyManager(map);
...
BucketConfiguration configuration = BucketConfiguration.builder()
.addLimit(Bandwidth.simple(1_000, Duration.ofMinutes(1)))
.build();
Bucket bucket = proxyManager.builder().build(key, configuration);
3.2.4. Configuring Custom Serialization for Bucket4j library classes
If you configure nothing, then by default Java serialization will be used for serialization Bucket4j library classes. Java serialization can be rather slow and should be avoided in general.
Bucket4j
provides custom serializers for all library classes that could be transferred over the network.
To let Hazelcast know about fast serializers you should register them programmatically in the serialization config:
import com.hazelcast.config.Config;
import com.hazelcast.config.SerializationConfig;
import com.hazelcast.config.SerializerConfig;
import io.github.bucket4j.grid.hazelcast.serialization.HazelcastSerializer;
...
Config config = ...
SerializationConfig serializationConfig = config.getSerializationConfig();
// the starting type ID number for Bucket4j classes.
// you free to choose any unused ID, but be aware that Bucket4j uses 2 types currently,
// and may use more types in the future, so leave enough empty space after baseTypeIdNumber
int baseTypeIdNumber = 10000;
HazelcastProxyManager.addCustomSerializers(serializationConfig, baseTypeIdNumber);
3.2.5. Support for externally managed Hazelcast without classpath access
bucket4j-hazelcast
requires putting Bucket4j jars to classpath of each node of Hazelcast cluster.
Sometimes you have no control over classpath because the Hazelcast cluster is externally managed(Paas scenario).
In such cases
can not be used because it is implemented on top of EntryProcessor functionality.HazelcastProxyManager
- HazelcastLockBasedProxyManager
-
is implemented on top IMap methods
lock
,get
,put
,unlock
. This implementation always requires 4 network hops for one rate-limit check. - HazelcastCompareAndSwapBasedProxyManager
-
is implemented on top IMap methods
get
,replace
,putIfAbsent
. This implementation requires 2 network hops if no contention happens, but in case of high contention on the key amount of hops is unpredictable.
-
HazelcastLockBasedProxyManager
does not provide async API because of lack oflockAsync
andunlockAsync
methods inside IMap API. -
HazelcastCompareAndSwapBasedProxyManager
does not provide async API because lack ofreplaceAsync
andputIfAbsentAsync
methods inside IMap API.
If you wish to async API be supported by HazelcastLockBasedProxyManager
and HazelcastCompareAndSwapBasedProxyManager
ask Hazelcast maintainers to support the missed APIs mentioned above.
3.2.6. Known issues related with Docker and(or) SpringBoot
-
#186 HazelcastEntryProcessor class not found - check file permissions inside your image.
-
#182 HazelcastSerializationException with Hazelcast 4.2 - properly setup classloader for Hazelcast client configuration.
3.3. Apache Ignite integration
Before use bucket4j-ignite
module please read [bucket4j-jcache documentation](jcache-usage.md),
because bucket4j-ignite
is just a follow-up of bucket4j-jcache
.
Bucket4j supports Ignite Thin-Client as well as regular deployment scenarios.
Question: Bucket4j already supports JCache since version 1.2
. Why it was needed to introduce direct support for Apache Ignite
?
Answer: Because JCache API (JSR 107) does not specify asynchronous API,
developing the dedicated module bucket4j-ignite
was the only way to provide asynchrony for users who use Bucket4j
and Apache Ignite
together.
Question: Should I migrate from bucket4j-jcache
to bucketj-ignite
If I do not need an asynchronous API?
Answer: No, you should not migrate to bucketj-ignite
in this case.
3.3.1. Dependencies
To use bucket4j-ignite
extension you need to add following dependency:
<dependency>
<groupId>com.bucket4j</groupId>
<artifactId>bucket4j-ignite</artifactId>
<version>8.0.1</version>
</dependency>
Note
|
see Java compatibility matrix if you need for build that is compatible with Java 8 |
3.3.2. Example of Bucket instantiation via IgniteProxyManager
org.apache.ignite.IgniteCache<K, byte[]> cache = ...;
private static final IgniteProxyManager proxyManager = new IgniteProxyManager(cache);
...
BucketConfiguration configuration = BucketConfiguration.builder()
.addLimit(Bandwidth.simple(1_000, Duration.ofMinutes(1)))
.build();
Bucket bucket = proxyManager.builder().build(key, configuration);
Important
|
Pay attention that IgniteProxyManager requires all nodes in the cluster to contain bucket4j Jars in classpath. |
3.3.3. Example of Bucket instantiation via Thin Client
org.apache.ignite.client.ClientCache<K, byte[]> cache = ...;
org.apache.ignite.client.ClientCompute clientCompute = ...;
private static final IgniteThinClientProxyManager<K> proxyManager = new IgniteThinClientProxyManager(cache, clientCompute)
...
BucketConfiguration configuration = BucketConfiguration.builder()
.addLimit(Bandwidth.simple(1_000, Duration.ofMinutes(1)))
.build();
Bucket bucket = proxyManager.builder().build(key, configuration);
Important
|
Pay attention that IgniteThinClientProxyManager requires all nodes in the cluster to contain bucket4j Jars in classpath. |
3.3.4. Example of Bucket instantiation of via Thin Client and IgniteThinClientCasBasedProxyManager
org.apache.ignite.client.ClientCache<K, byte[]> cache = ...;
private static final IgniteThinClientCasBasedProxyManager<K> proxyManager = new IgniteThinClientCasBasedProxyManager(cache)
...
BucketConfiguration configuration = BucketConfiguration.builder()
.addLimit(Bandwidth.simple(1_000, Duration.ofMinutes(1)))
.build();
Bucket bucket = proxyManager.builder().build(key, configuration);
Important
|
IgniteThinClientCasBasedProxyManager does not require all nodes in the cluster to contain bucket4j Jars in classpath, but it operates with more latency, so choose it over IgniteThinClientProxyManager if and only if you have no control over cluster classpath. |
3.4. Infinispan integration
3.4.1. Dependencies
To use bucket4j-infinispan
with Infinispan 9.x, 10.x
extension you need to add following dependency:
<dependency>
<groupId>com.bucket4j</groupId>
<artifactId>bucket4j-infinispan</artifactId>
<version>8.0.1</version>
</dependency>
If you are using a legacy version of Infinispan 8.x
then you need to add the following dependency:
<dependency>
<groupId>com.bucket4j</groupId>
<artifactId>bucket4j-infinispan-8</artifactId>
<version>8.0.1</version>
</dependency>
Note
|
see Java compatibility matrix if you need for build that is compatible with Java 8 |
3.4.2. General compatibility matrix principles::
-
Bucket4j authors do not perform continuous monitoring of new Infinispan releases. So, there is can be a case when there is no one version of Bucket4j which is compatible with the newly released Infinispan, just log an issue to bug tracker in this case, adding support to new version of Infinispan is usually an easy exercise.
-
Integrations with legacy versions of Infinispan are not removed without a clear reason. Hence, you are in safety, even you are working in a big enterprise company that does not update its infrastructure frequently because You still get new Bucket4j’s features even for legacy Infinispan’s releases.
3.4.3. Special notes for Infinispan 10.0+
As mentioned in the Infinispan Marshalling documentation, since release 10.0.0
Infinispan does not allow deserialization of custom payloads into Java classes. If you do not configure serialization(as described below), you will get an error like this on any attempt to use Bucket4j with a brand new Infinispan release:
Jan 02, 2020 4:57:56 PM org.infinispan.marshall.persistence.impl.PersistenceMarshallerImpl objectToBuffer
WARN: ISPN000559: Cannot marshall 'class io.github.bucket4j.grid.infinispan.InfinispanProcessor'
java.lang.IllegalArgumentException: No marshaller registered for Java type io.github.bucket4j.grid.infinispan.SerializableFunctionAdapter
at org.infinispan.protostream.impl.SerializationContextImpl.getMarshallerDelegate(SerializationContextImpl.java:279)
at org.infinispan.protostream.WrappedMessage.writeMessage(WrappedMessage.java:240)
at org.infinispan.protostream.ProtobufUtil.toWrappedStream(ProtobufUtil.java:196)
There are three options to solve this problem:
* Configure Jboss marshaling instead of defaulting ProtoStream marshaller as described there.
* Configure Java Serialization Marshaller instead of default ProtoStream marshaller, as described there.
Do not forget to add io.github.bucket4j.*
regexp to the whitelist if choosing this way.
* And last way(recommended) just register Bucket4j serialization context initializer
in the serialization configuration.
You can do it in both programmatically and declarative ways:
import io.github.bucket4j.grid.infinispan.serialization.Bucket4jProtobufContextInitializer;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
...
GlobalConfigurationBuilder builder = new GlobalConfigurationBuilder();
builder.serialization().addContextInitializer(new Bucket4jProtobufContextInitializer());
<serialization>
<context-initializer class="io.github.bucket4j.grid.infinispan.serialization.Bucket4jProtobufContextInitializer"/>
</serialization>
And that is all. Just registering Bucket4jProtobufContextInitializer
in any way is enough to make Bucket4j compatible with ProtoStream marshaller, you do not have to care about *.proto
files, annotations, whitelist, etc, all necessary Protobuffers configs generated by Bucket4jProtobufContextInitializer
and register on the fly.
3.4.4. Example of Bucket instantiation
org.infinispan.functional.FunctionalMap.ReadWriteMap<K, byte[]> map = ...;
private static final InfinispanProxyManager<K> proxyManager = new InfinispanProxyManager(map);
...
BucketConfiguration configuration = BucketConfiguration.builder()
.addLimit(Bandwidth.simple(1_000, Duration.ofMinutes(1)))
.build(key, configuration);
Bucket bucket = proxyManager.builder().build(configuration);
3.5. Oracle Coherence integration
3.5.1. Dependencies
To use bucket4j-coherence
extension you need to add the following dependency:
<dependency>
<groupId>com.bucket4j</groupId>
<artifactId>bucket4j-coherence</artifactId>
<version>8.0.1</version>
</dependency>
Note
|
see Java compatibility matrix if you need for build that is compatible with Java 8 |
3.5.2. Example of Bucket instantiation
com.tangosol.net.NamedCache<K, byte[]> cache = ...;
private static final CoherenceProxyManager<K> proxyManager = new CoherenceProxyManager(map);
...
BucketConfiguration configuration = BucketConfiguration.builder()
.addLimit(Bandwidth.simple(1_000, Duration.ofMinutes(1)))
.build(key, configuration);
Bucket bucket = proxyManager.builder().build(configuration);
3.5.3. Configuring POF serialization for Bucket4j library classes
If you configure nothing, then by default Java serialization will be used for serialization Bucket4j library classes. Java serialization can be rather slow and should be avoided in general.
Bucket4j
provides custom POF serializers for all library classes that could be transferred over the network.
To let Coherence know about POF serializers you should register three serializers in the POF configuration config file:
io.github.bucket4j.grid.coherence.pof.CoherenceEntryProcessorPofSerializer
for class io.github.bucket4j.grid.coherence.CoherenceProcessor
<pof-config xmlns="http://xmlns.oracle.com/coherence/coherence-pof-config"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-pof-config coherence-pof-config.xsd">
<user-type-list>
<!-- Include default Coherence types -->
<include>coherence-pof-config.xml</include>
<!-- Define serializers for Bucket4j classes -->
<user-type>
<type-id>1001</type-id>
<class-name>io.github.bucket4j.grid.coherence.CoherenceProcessor</class-name>
<serializer>
<class-name>io.github.bucket4j.grid.coherence.pof.CoherenceEntryProcessorPofSerializer</class-name>
</serializer>
</user-type>
</user-type-list>
</pof-config>
Double-check with official Oracle Coherence documentation in case of any questions related to Portable Object Format
.
3.6. Bucket4j-Redis integration
Bucket4j provides integration with four Redis libraries:
Library | Async supported | Proxy manager class |
---|---|---|
|
Yes |
RedissonBasedProxyManager |
|
Yes |
LettuceBasedProxyManager |
|
No |
JedisBasedProxyManager |
|
No |
SpringDataRedisBasedProxyManager |
Important
|
For all libraries mentioned above concurrent access to Redis is solved by Compare&Swap pattern, this can be improved in the future via switching to Lua stored procedures. |
3.6.1. Dependencies
To use bucket4j-redis
extension you need to add following dependency:
<dependency>
<groupId>com.bucket4j</groupId>
<artifactId>bucket4j-redis</artifactId>
<version>8.0.1</version>
</dependency>
Note
|
see Java compatibility matrix if you need for build that is compatible with Java 8 |
3.6.2. Example of Bucket instantiation via RedissonBasedProxyManager
org.redisson.command.CommandExecutor commandExecutor = ...;
RedissonBasedProxyManager proxyManager = RedissonBasedProxyManager.builderFor(commandExecutor)
.withExpirationStrategy(ExpirationAfterWriteStrategy.basedOnTimeForRefillingBucketUpToMax(Duration.ofSeconds(10)))
.build();
...
BucketConfiguration configuration = BucketConfiguration.builder()
.addLimit(Bandwidth.simple(1_000, Duration.ofMinutes(1)))
.build();
Bucket bucket = proxyManager.builder().build(key, configuration);
3.6.3. Example of Bucket instantiation via LettuceBasedProxyManager
io.lettuce.core.RedisClient redisClient = ...;
LettuceBasedProxyManager proxyManager = LettuceBasedProxyManager.builderFor(redisClient)
.withExpirationStrategy(ExpirationAfterWriteStrategy.basedOnTimeForRefillingBucketUpToMax(Duration.ofSeconds(10)))
.build();
...
BucketConfiguration configuration = BucketConfiguration.builder()
.addLimit(Bandwidth.simple(1_000, Duration.ofMinutes(1)))
.build();
Bucket bucket = proxyManager.builder().build(key, configuration);
3.6.4. Example of Bucket instantiation via JedisBasedProxyManager
redis.clients.jedis.JedisPool jedisPool = ...;
LettuceBasedProxyManager proxyManager = JedisBasedProxyManager.builderFor(jedisPool)
.withExpirationStrategy(ExpirationAfterWriteStrategy.basedOnTimeForRefillingBucketUpToMax(Duration.ofSeconds(10)))
.build();
...
BucketConfiguration configuration = BucketConfiguration.builder()
.addLimit(Bandwidth.simple(1_000, Duration.ofMinutes(1)))
.build();
Bucket bucket = proxyManager.builder().build(key, configuration);
3.6.5. Example of Bucket instantiation via SpringDataRedisBasedProxyManager
org.springframework.data.redis.connection.RedisCommands redisCommands = ...;
SpringDataRedisBasedProxyManager proxyManager = SpringDataRedisBasedProxyManager.builderFor(redisCommands)
.withExpirationStrategy(ExpirationAfterWriteStrategy.basedOnTimeForRefillingBucketUpToMax(Duration.ofSeconds(10)))
.build();
...
BucketConfiguration configuration = BucketConfiguration.builder()
.addLimit(Bandwidth.simple(1_000, Duration.ofMinutes(1)))
.build();
Bucket bucket = proxyManager.builder().build(key, configuration);
3.7. JDBC integrations
General principles to use each JDBC integration:
-
Bucket4j authors do not provide create a table for store buckets, you must make the table personally
-
You should create a trigger or a scheduler that will clear your bucket storage table since DBMS is not IMDB, and DBMS don’t give TTL the opportunity
-
You should create a table, which includes the next required columns: BIGINT as a PRIMARY KEY, BYTEA as a state. By default, Bucket4j works with the next structure:
CREATE TABLE IF NOT EXISTS buckets(id BIGINT PRIMARY KEY, state BYTEA);
CREATE TABLE IF NOT EXISTS buckets(id BIGINT PRIMARY KEY, state BLOB);
3.7.1. Configuring custom settings of SQLProxyManager
-
Each proxy manager takes
SQLProxyConfiguration
to customize work with database -
To do that, you should use
SQLProxyConfigurationBuilder
, which includes the next parameters:
/**
* @param clientSideConfig {@link ClientSideConfig} client-side configuration for proxy-manager.
* By default, under the hood uses {@link ClientSideConfig#getDefault}
* @return {@link SQLProxyConfigurationBuilder}
*/
public SQLProxyConfigurationBuilder addClientSideConfig(ClientSideConfig clientSideConfig) {
this.clientSideConfig = clientSideConfig;
return this;
}
/**
* @param tableSettings {@link BucketTableSettings} define a configuration of the table to use as a Buckets store.
* By default, under the hood uses {@link BucketTableSettings#getDefault}
* @return {@link SQLProxyConfigurationBuilder}
*/
public SQLProxyConfigurationBuilder addTableSettings(BucketTableSettings tableSettings) {
this.tableSettings = tableSettings;
return this;
}
3.7.2. Overriding table configuration
You can override the names of the columns to set your custom name of columns, to do that, you should use BucketTableSettings
to set into SQLProxyConfigurationBuilder
of your JDBC implementation.
-
SQLProxyConfigurationBuilder
TakesBucketTableSettings
- is the class to define a configuration of the table to use as a buckets store. By default, under the hood usesBucketTableSettings.getDefault()
Parameters:
tableName
- name of table to use as a Buckets store
idName
- name of id (PRIMARY KEY - BIGINT)
stateName
- name of state (BYTEA)
By default, uses: "buckets" as a tableName
; "id" as a idName
; "state" as a stateName
addTableSettings
Takes BucketTableSettings
- See Overriding table configuration.
3.7.3. PostgreSQL integration
Dependencies
To use Bucket4j extension for PostgreSQL you need to add following dependency:
<dependency>
<groupId>com.bucket4j</groupId>
<artifactId>bucket4j-postgresql</artifactId>
<version>8.0.1</version>
</dependency>
Note
|
see Java compatibility matrix if you need for build that is compatible with Java 8 |
Example of Bucket instantiation
Long key = 1L;
PostgreSQLadvisoryLockBasedProxyManager proxyManager = new PostgreSQLadvisoryLockBasedProxyManager(new SQLProxyConfiguration(dataSource));
BucketConfiguration bucketConfiguration = BucketConfiguration.builder()
.addLimit(Bandwidth.simple(10, Duration.ofSeconds(1)))
.build();
BucketProxy bucket = proxyManager.builder().build(key, bucketConfiguration);
-
The example of usage
SQLProxyConfigurationBuilder
to customize your setting for thePostgreSQLadvisoryLockBasedProxyManager
:
SQLProxyConfiguration configuration = SQLProxyConfigurationBuilder.builder()
.withClientSideConfig(ClientSideConfig.getDefault().withClientClock(TimeMeter.SYSTEM_MILLISECONDS))
.withTableSettings(BucketTableSettings.customSettings("tableName", "idName", "stateName"))
.build(dataSource);
PostgreSQLadvisoryLockBasedProxyManager proxyManager = new PostgreSQLadvisoryLockBasedProxyManager(configuration);
Strategies of transaction
-
Bucket4j is given opportunities to choose in which strategy of transaction work
PostgreSQLadvisoryLockBasedProxyManager
- Based on pg_advisory_xact_lock locks an application-defined resource, which can be identified either by a single 64-bit key value or two 32-bit key values (note that these two key spaces do not overlap).
If another session already holds a lock on the same resource identifier, this function will wait until the resource becomes available.
The lock is exclusive.
Multiple lock requests stack so that if the same resource is locked three times it must then be unlocked three times to be released for other sessions use.
The lock is automatically released at the end of the current transaction and cannot be released explicitly.
PostgreSQLSelectForUpdateLockBasedTransactionTest
- Based on Select For Update
This prevents them from being modified or deleted by other transactions until the current transaction ends.
That is, other transactions that attempt UPDATE, DELETE, or SELECT FOR UPDATE of these rows will be blocked until the current transaction ends.
Also, if an UPDATE, DELETE, or SELECT FOR UPDATE from another transaction has already locked a selected row or rows, SELECT FOR UPDATE will wait for the other transaction to complete, and will then lock and return the updated row (or no row, if the row was deleted).
Within a SERIALIZABLE transaction, however, an error will be thrown if a row to be locked has changed since the transaction started.
3.7.4. MySQL integration
Dependencies
To use Bucket4j extension for MySQL you need to add following dependency:
<dependency>
<groupId>com.bucket4j</groupId>
<artifactId>bucket4j-mysql</artifactId>
<version>8.0.1</version>
</dependency>
Example of Bucket instantiation
Long key = 1L;
MySQLSelectForUpdateBasedProxyManager proxyManager = new MySQLSelectForUpdateBasedProxyManager(new SQLProxyConfiguration(dataSource));
BucketConfiguration bucketConfiguration = BucketConfiguration.builder()
.addLimit(Bandwidth.simple(10, Duration.ofSeconds(1)))
.build();
BucketProxy bucket = proxyManager.builder().build(key, bucketConfiguration);
3.8. Asynchronous API
Since version 3.0
Bucket4j provides asynchronous analogs for the majority of API methods.
Async view of proxyManager is available through asAsync()
method:
ProxyManager proxyManager = ...;
AsyncProxyManager asyncProxyManager = proxyManager.asAsync();
BucketConfiguration configuration = ...;
AsyncBucketProxy asyncBucket = asyncProxyManager.builder().build(key, configuration);
Each method of class
has full equivalence with the same semantic in asynchronous version in the AsyncBucketProxy
class.Bucket
3.8.1. Example - limiting the rate of access to the asynchronous servlet
Imagine that you develop an SMS service, which allows sending SMS via an HTTP interface. You want your architecture to be protected from overloading, clustered, and fully asynchronous.
Overloading protection requirement:
To prevent fraud and service overloading you want to introduce the following limit for any outbound phone number: The bucket size is 20 SMS (which cannot be exceeded at any given time), with a "refill rate" of 10 SMS per minute that continually increases tokens in the bucket. In other words, if a client sends 10 SMS per minute, it will never be throttled, and moreover, the client has overdraft equals to 20 SMS which can be used if the average is a little bit higher than 10 SMS/minute on short time period. Solution: let’s use bucket4j for this.
Clustering requirement:
You want to avoid the single point of failure, if one server crashed that information about consumed tokens should not be lost, thus it would be better to use any distributed computation platform for storing the buckets.
Solution: let’s use JBoss Infinispan for this and bucket4j-infinispan
extension.
Hazelcast and Apache Ignite will be also well-chosen, Infinispan just selected as an example.
Asynchronous processing requirement: Also for maximum scalability, you want from architecture to be fully non-blocking, non-blocking architecture means that both sms sending and limit checking should be asynchronous. Solution: let’s use asynchronous features provided by bucket4j and Servlet-API.
Mockup of service based on top of Servlet API and bucket4j-infinispan:
public class SmsServlet extends javax.servlet.http.HttpServlet {
private SmsSender smsSender;
private AsyncProxyManager<String> buckets;
private Supplier<BucketConfiguration> configuration;
@Override
public void init(ServletConfig config) throws ServletException {
super.init(config);
ServletContext ctx = config.getServletContext();
smsSender = (SmsSender) ctx.getAttribute("sms-sender");
FunctionalMapImpl<String, byte[]> bucketMap = (FunctionalMapImpl<String, byte[]>) ctx.getAttribute("bucket-map");
this.buckets = new InfinispanProxyManager(bucketMap).asAsync();
this.configuration = () -> {
long overdraft = 20;
Refill refill = Refill.greedy(10, Duration.ofMinutes(1));
Bandwidth limit = Bandwidth.classic(overdraft, refill);
return BucketConfiguratiion.builder()
.addLimit(limit)
.build();
};
}
@Override
protected void doPost(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
String fromNumber = req.getParameter("from");
String toNumber = req.getParameter("to");
String text = req.getParameter("text");
AsyncBucketProxy bucket = buckets.builder().build(fromNumber, configuration);
CompletableFuture<ConsumptionProbe> limitCheckingFuture = bucket.asAsync().tryConsumeAndReturnRemaining(1);
final AsyncContext asyncContext = req.startAsync();
limitCheckingFuture.thenCompose(probe -> {
if (!probe.isConsumed()) {
Result throttledResult = Result.throttled(probe);
return CompletableFuture.completedFuture(throttledResult);
} else {
CompletableFuture<Result> sendingFuture = smsSender.sendAsync(fromNumber, toNumber, text);
return sendingFuture;
}
}).whenComplete((result, exception) -> {
HttpServletResponse asyncResponse = (HttpServletResponse) asyncContext.getResponse();
try {
asyncResponse.setContentType("text/plain");
if (exception != null || result.isFailed()) {
asyncResponse.setStatus(500);
asyncResponse.getWriter().println("Internal Error");
} else if (result.isThrottled()) {
asyncResponse.setStatus(429);
asyncResponse.setHeader("X-Rate-Limit-Retry-After-Seconds", "" + result.getRetryAfter());
asyncResponse.getWriter().append("Too many requests");
} else {
asyncResponse.setStatus(200);
asyncResponse.getWriter().append("Success");
}
} finally{
asyncContext.complete();
}
});
}
}
3.9. Framework to implement custom work with your database
The Bucket4j library allows implementing work with any database. If you didn’t find in distributed realization your database (currently Bucket4j supports the next databases: Redis, Hazelcast, Apache Ignite, Infinispan, Oracle coherence, Dynamodb, PostgreSQL, MySQL) you can implement your database as a distributed storage. All what you need to do, extends from io.github.bucket4j.distributed.proxy.generic.select_for_update.AbstractLockBasedProxyManager or AbstractSelectForUpdateBasedProxyManager<T> and override 3 methods and create your implementation which implements from io.github.bucket4j.distributed.proxy.generic.select_for_update.LockBasedTransaction.
Step by step to take that.
First of all we need to create our custom proxy manages which extends from AbstractLockBasedProxyManager<T> or AbstractSelectForUpdateBasedProxyManager<T> (as genetic classes takes a type of key table). To define in which class you should extend, need to understand the main idea of these classes:
AbstractLockBasedProxyManager<T>
- Uses to realize based on exclusive locks
AbstractSelectForUpdateBasedProxyManager<T>
- Uses to realize Select For Update concept
After need to override works of allocation transaction, to do that, we should override method allocateTransaction.
The main idea of allocateTransaction to just return class which implements LockBasedTransaction
(for AbstractLockBasedProxyManager<T>
)
or SelectForUpdateBasedTransaction
(for AbstractSelectForUpdateBasedProxyManager<T>
) - we will implement it later
And override removeProxy() for remove bucket from the table which store buckets.
Second of all
Need to implement LockBasedTransaction
or SelectForUpdateBasedTransaction
to realize custom work of database for transaction.
To do that, we need to create a custom class to implement from one of these classes
LockBasedTransaction
/**
* Begins transaction if underlying storage requires transactions.
* There is strong guarantee that {@link #commit()} or {@link #rollback()} will be called if {@link #begin()} returns successfully.
*/
void begin();
/**
* Rollbacks transaction if underlying storage requires transactions
*/
void rollback();
/**
* Commits transaction if underlying storage requires transactions
*/
void commit();
/**
* Locks data by the key associated with this transaction and returns data that is associated with the key.
* There is strong guarantee that {@link #unlock()} will be called if {@link #lockAndGet()} returns successfully.
*
* @return Returns the data by the key associated with this transaction, or null data associated with key does not exist
*/
byte[] lockAndGet();
/**
* Unlocks data by the key associated with this transaction.
*/
void unlock();
/**
* Creates the data by the key associated with this transaction.
*
* @param data bucket state to persists
*/
void create(byte[] data);
/**
* Updates the data by the key associated with this transaction.
*
* @param data bucket state to persists
*/
void update(byte[] data);
/**
* Frees resources associated with this transaction
*/
void release();
As an example, you can see to the PostgreSQL or MySQL realization which based on select for update concept.
SelectForUpdateBasedTransaction
/**
* Begins transaction if underlying storage requires transactions.
* There is strong guarantee that {@link #commit()} or {@link #rollback()} will be called if {@link #begin()} returns successfully.
*/
void begin();
/**
* Rollbacks transaction if underlying storage requires transactions
*/
void rollback();
/**
* Commits transaction if underlying storage requires transactions
*/
void commit();
/**
* Locks data by the key associated with this transaction and returns data that is associated with the key.
*
* @return the data by the key associated with this transaction, or null data associated with key does not exist
*/
LockAndGetResult tryLockAndGet();
/**
* Creates empty data by for the key associated with this transaction.
* This operation is required to be able to lock data in the scope of next transaction.
*
* @return true if data has been inserted
*/
boolean tryInsertEmptyData();
/**
* Updates the data by the key associated with this transaction.
*
* @param data bucket state to persists
*/
void update(byte[] data);
/**
* Frees resources associated with this transaction
*/
void release();
3.10. Production checklist especially in the context of distributed systems
Before using Bucket4j in clustered scenario you need to understand, agree, and configure the following points:
When working within a distributed system, it is inevitable that requests may cross the border of the current JVM, leading to communication on the network. The network being unreliable, it is impossible to avoid failures. Thus you should embrace this reality and be ready to get unchecked exceptions when interacting with a distributed bucket. It is your responsibility to handle(or ignore) such exceptions:
-
You probably do not want to fail business transactions if the grid responsible for throttling goes down. If this is the case you can simply log the exception and continue your business transaction without throttling
-
If you wish to fail your business transaction when the grid responsible for throttling goes down, simply rethrow or don’t catch the exception
If the state of any bucket should survive the restart/crash of the grid node that holds its state, you need to configure backups yourself, in a way specific to the particular grid vendor. For example, see how to configure backups for Apache Ignite.
When dealing with multi-tenant scenarios like a bucket per user or a bucket per IP address, the number of buckets in the cache will continuously increase. This is because a new bucket will be created each time a new key is detected. To prevent exhausting the available memory of your cluster you need to configure the following aspects:
-
Expiration since last access - in order to allow the grid to remove the keys which haven’t been used in a long time. For example, see how to configure expiration policy for Apache Ignite.
-
Maximum cache size(in units of bytes) - Obviously it is preferable to lose bucket data than lose the whole cluster due to memory exception.
There are no special settings for HA supported by Bucket4j because Bucket4j does nothing more than just invoking EntryProcessors on the cache. Instead, Bucket4j relies on you to configure the cache with proper parameters that control redundancy and high availability.
Years of experience working with the distributed system has taught the author that High Availability does not come for free. You need to test and verify that your system remains available. This cannot be provided by this or any other library. Your system will most certainly go down if you do not plan for that.
4. Advanced features
4.1. Listening for bucket events
4.1.1. What can be listened
-
When tokens are consumed from a bucket.
-
When consumption requests were rejected by the bucket.
-
When the thread was parked to wait for tokens refill as a result of interaction with
BlockingBucket
. -
When the thread was interrupted during the wait for tokens to be refilled as a result of interaction with
BlockingBucket
. -
When a delayed task was submitted to
ScheduledExecutorService
as a result of interaction withAsyncScheduledBucket
.
4.1.2. Listener API - corner cases
Question: How many listeners are needed to create an application that uses many buckets?
Answer: it depends:
-
If you want to have aggregated statistics for all buckets then create a single listener per application and reuse this listener for all buckets.
-
If you want to measure statistics independently per bucket then use a listener per bucket model.
Question: where are methods the listener is invoking in case of distributed usage?
Answer: listener always invoked on the client-side, which means that each client JVM will have its independent stat for the same bucket.
Question: Why does bucket invoke the listener on the client-side instead of the server-side in case of distributed scenario? What do I need to do if I need an aggregated stat across the whole cluster?
Answer: Because of a planned expansion to non-JVM back-ends such as Redis, MySQL, PostgreSQL. It is not possible to serialize and invoke listener on this non-java back-ends, so it was decided to invoke listener on the client-side, to avoid inconsistency between different back-ends in the future. You can do post-aggregation of monitoring statistics via features built into your monitoring database or via mediator(like StatsD) between your application and the monitoring database.
4.1.3. How to attach a listener to a bucket?
The bucket can be decorated by the listener via the toListenable
method.
BucketListener listener = new MyListener();
Bucket bucket = Bucket.builder()
.addLimit(Bandwidth.simple(100, Duration.ofMinutes(1)))
.build()
.toListenable(listener);
4.1.4. Example of integration with Dropwizard metrics-core
io.github.bucket4j.SimpleBucketListener
is a simple implementation of io.github.bucket4j.BucketListener
interface that is available out of the box. Below is the example of exposing statistics via Dropwizard Metrics(for Micrometer it should be quite similar):
public static Bucket decorateBucketByStatListener(Bucket originalBucket, String bucketName, MetricRegistry registry) {
SimpleBucketListener stat = new SimpleBucketListener();
registry.register(name + ".consumed", (Gauge<Long>) stat::getConsumed);
registry.register(name + ".rejected", (Gauge<Long>) stat::getRejected);
registry.register(name + ".parkedNanos", (Gauge<Long>) stat::getParkedNanos);
registry.register(name + ".interrupted", (Gauge<Long>) stat::getInterrupted);
registry.register(name + ".delayedNanos", (Gauge<Long>) stat::getDelayedNanos);
return originalBucket.toListenable(stat);
}
4.2. Verbose API
- Verbose API
-
is the API whose intent is in injecting low-level diagnostic information into the results of any interaction with a bucket. Verbose API provides the same functionality as Regular API, with one exception - a result of any method always decorated by
VerboseResult
wrapper. - VerboseResult
-
is the wrapper for interaction result that provides the snapshot of a bucket and its configuration that was actual at the moment of interaction with a bucket.
4.2.1. Verbose API entry-points
The way to get access for Verbose API
is the same for all types of buckets, just call asVerbose()
method:
// for io.github.bucket4j.Bucket
Bucket bucket = ...;
bucket.asVerbose();
// for io.github.bucket4j.Bucket
Bucket bucket = ...;
VerboseBucket verboseBucket = bucket.asVerbose();
// for io.github.bucket4j.distributed.BucketProxy
BucketProxy bucket = ...;
VerboseBucket verboseBucket = bucket.asVerbose();
// for io.github.bucket4j.distributed.AsyncBucketProxy
AsyncBucketProxy bucket = ...;
AsyncVerboseBucket verboseBucket = bucket.asVerbose();
Note
|
BlockingBucket and ScheduledBucket do not provide the verbose analogs. VerboseResult has no sense for this kind of buckets because interactions with them can be followed by thread sleep or delayed execution, so VerboseResult can be stale and irrelevant to the moment when control over execution is being returned to your code. |
4.2.2. Principles of result decoration
-
void return type always decorated by
VerboseResult<Void>
-
A primitive result type like long, boolean always decorated by correspondent boxed type for example
VerboseResult<Boolean>
-
Non-primitive result type always decorated as is, for example,
VerboseResult<EstimationProbe>
4.2.3. Example of Verbose API usage
VerboseResult<ConsumptionProbe> verboseResult = bucket.asVerbose().tryConsumeAndReturnRemaining(numberOfTokens);
BucketConfiguration bucketConfiguration = verboseResult.getConfiguration();
long capacity = Arrays.stream(bucketConfiguration.getBandwidths())
.mapToLong(Bandwidth::getCapacity)
.max().getAsLong();
response.addHeader("RateLimit-Limit", "" + capacity));
VerboseResult.Diagnostics diagnostics = result.getDiagnostics()
response.addHeader("RateLimit-Remaining", "" + diagnostics.getAvailableTokens());
response.addHeader("RateLimit-Reset", "" + TimeUnit.NANOSECONDS.toSeconds(diagnostics.calculateFullRefillingTime()));
ConsumptionProbe probe = verboseResult.getValue();
if (probe.isConsumed()) {
// the limit is not exceeded
filterChain.doFilter(servletRequest, servletResponse);
} else {
// limit is exceeded
HttpServletResponse httpResponse = (HttpServletResponse) servletResponse;
httpResponse.setStatus(429);
httpResponse.setContentType("text/plain");
httpResponse.getWriter().append("Too many requests");
}
4.3. On-the-fly configuration replacement
As previously mentioned in the definition for BucketConfiguration it is an immutable object.
It is not possible to add, remove or change the limits for already created configuration, however, you can replace the configuration of the bucket via creating a new configuration instance and calling bucket.replaceConfiguration(newConfiguration, tokensInheritanceStrategy)
.
4.3.1. Why configuration replacement is not trivial?
-
The first problem of configuration replacement is deciding on how to propagate available tokens from a bucket with a previous configuration to the bucket with a new configuration. If you don’t care about previous the bucket state then use TokensInheritanceStrategy.RESET. But it becomes a tricky problem when we expect that previous consumption (that has not been compensated by refill yet) should take effect on the bucket with a new configuration. In this case, you need to choose between:
-
There is another problem when you are choosing PROPORTIONALLY, AS_IS or ADDITIVE or AS_IS and a bucket has more than one bandwidth. For example, how does replaceConfiguration implementation bind bandwidths to each other in the following example?
Bucket bucket = Bucket.builder() .addLimit(Bandwidth.simple(10, Duration.ofSeconds(1))) .addLimit(Bandwidth.simple(10000, Duration.ofHours(1))) .build(); ... BucketConfiguration newConfiguration = BucketConfiguration.configurationBuilder() .addLimit(Bandwidth.simple(5000, Duration.ofHours(1))) .addLimit(Bandwidth.simple(100, Duration.ofSeconds(10))) .build(); bucket.replaceConfiguration(newConfiguration, TokensInheritanceStrategy.AS_IS);
It is obvious that a simple strategy - copying tokens by bandwidth index will not work well in this case, because it highly depends on the order in which bandwidths were mentioned in the new and previous configuration.
4.3.2. Taking control over replacement process via bandwidth identifiers
Instead of inventing the backward magic Bucket4j provides you the ability to keep control this process by specifying identifiers for bandwidth, so in case of multiple bandwidth configuration replacement codes can copy available tokens by bandwidth ID. So it is better to rewrite the code above as follows:
Bucket bucket = Bucket.builder()
.addLimit(Bandwidth.simple(10, Duration.ofSeconds(1)).withId("technical-limit"))
.addLimit(Bandwidth.simple(10000, Duration.ofHours(1)).withId("business-limit"))
.build();
...
BucketConfiguration newConfiguration = BucketConfiguration.builder()
.addLimit(Bandwidth.simple(5000, Duration.ofHours(1)).withId("business-limit"))
.addLimit(Bandwidth.simple(100, Duration.ofSeconds(10)).withId("technical-limit"))
.build();
bucket.replaceConfiguration(newConfiguration, TokensInheritanceStrategy.PROPORTIONALLY);
-
By default bandwidth has <b>null</b> identifier.
-
null value of identifier equals to another null value if and only if there is only one bandwidth with a null identifier.
-
If an identifier for bandwidth is specified then it must be unique in the bucket. Bucket does not allow to create of several bandwidths with the same ID.
4.3.3. TokensInheritanceStrategy explanation
TokensInheritanceStrategy specifies the rules for inheritance of available tokens during configuration replacement process.
- RESET
-
Use this mode when you want just to forget about the previous bucket state. RESET just instructs to erase all previous states. Using this strategy equals removing a bucket and creating again with a new configuration.
- PROPORTIONALLY
-
Makes to copy available tokens proportional to bandwidth capacity by following formula: newAvailableTokens = availableTokensBeforeReplacement * (newBandwidthCapacity / capacityBeforeReplacement)
PROPORTIONALLY strategy examples:-
Example 1: imagine bandwidth that was created by
Bandwidth.classic(100, Refill.gready(10, Duration.ofMinutes(1)))
.At the moment of config replacement, there were 40 available tokens.
After replacing this bandwidth by following
Bandwidth.classic(200, Refill.gready(10, Duration.ofMinutes(1)))
40 available tokens will be multiplied by 2(200/100), and after replacement, we will have 80 available tokens. -
Example 2: imagine bandwidth that was created by
Bandwidth.classic(100, Refill.gready(10, Duration.ofMinutes(1)))
. At the moment of config replacement, there were 40 available tokens. After replacing this bandwidth by followingBandwidth.classic(20, Refill.gready(10, Duration.ofMinutes(1)))
40 available tokens will be multiplied by 0.2(20/100), and after replacement, we will have 8 available tokens.
-
- AS_IS
-
Instructs to copy available tokens as is, but with one exclusion: if available tokens are greater than new capacity, available tokens will be decreased to new capacity.
AS_IS strategy examples:-
Example 1: imagine bandwidth that was created by
Bandwidth.classic(100, Refill.gready(10, Duration.ofMinutes(1)))
.At the moment of config replacement, it was 40 available tokens.
After replacing this bandwidth by following
Bandwidth.classic(200, Refill.gready(10, Duration.ofMinutes(1)))}
40 available tokens will be just copied, and after replacement, we will have 40 available tokens. -
Example 2: imagine bandwidth that was created by
Bandwidth.classic(100, Refill.gready(10, Duration.ofMinutes(1)))
.At the moment of config replacement, it was 40 available tokens.
After replacing this bandwidth by following
Bandwidth.classic(20, Refill.gready(10, Duration.ofMinutes(1)))
40 available tokens can not be copied as is because it is greater than new capacity, so available tokens will be reduced to 20.
-
- ADDITIVE
-
Instructs to copy available tokens as is, but with one exclusion: if new bandwidth capacity is greater than old capacity, available tokens will be increased by the difference between the old and the new configuration.
The formula is following:
newAvailableTokens = Math.min(availableTokensBeforeReplacement, newBandwidthCapacity) + Math.max(0, newBandwidthCapacity - capacityBeforeReplacement)
ADDITIVE strategy examples:-
Example 1: imagine bandwidth that was created by
Bandwidth.classic(100, Refill.gready(10, Duration.ofMinutes(1)))
.At the moment of configuration replacement, it was 40 available tokens.
After replacing this bandwidth by following
Bandwidth.classic(200, Refill.gready(10, Duration.ofMinutes(1)))
40 available tokens will be copied and added to the difference between old and new configurations, and after replacement, we will have 140 available tokens. -
Example 2: imagine bandwidth that was created by
Bandwidth.classic(100, Refill.gready(10, Duration.ofMinutes(1)))
.At the moment of config replacement, it was 40 available tokens.
After replacing this bandwidth by following
Bandwidth.classic(20, Refill.gready(10, Duration.ofMinutes(1))))
, and after replacement we will have 20 available tokens. -
Example 3: imagine bandwidth that was created by
Bandwidth.classic(100, Refill.gready(10, Duration.ofMinutes(1)))
.At the moment of config replacement, it was 10 available tokens.
After replacing this bandwidth by following
Bandwidth.classic(20, Refill.gready(10, Duration.ofMinutes(1))))
, and after replacement, we will have 10 available tokens.
-