1. About Bucket4j
1.1. What is Bucket4j
Bucket4j is Java rate-limiting library is mainly based on token-bucket algorithm, which are by de-facto standard for rate limiting in the IT industry.
Important
|
Bucket4j is more than direct implementation of token-bucket
Its math model provides several useful extensions that are not mentioned in the classic token-bucket interpretations, such as multiple limits per bucket or overdraft. These math extensions will be detailed described later.
|
You can read more about token bucket by following links:
-
Token bucket - wikipedia page describes the token-bucket algorithm in classical form.
-
Non-formal overview of token-bucket algorithm - the brief overview of token-bucket algorithm.
1.2. Bucket4j basic features
-
Absolutely non-compromise precision - Bucket4j does not operate with floats or doubles, all calculation are performed in the integer arithmetic, this feature protects end users from calculation errors involved by rounding.
-
Effective implementation in terms of concurrency:
-
Bucket4j is good scalable for multi-threading case it by defaults uses lock-free implementation.
-
In same time, library provides different concurrency strategies that can be chosen when default lock-free strategy is not desired.
-
-
Effective API in terms of garbage collector footprint: Bucket4j API tries to use primitive types as much as it is possible in order to avoid boxing and other types of floating garbage.
-
Pluggable listener API that allows to implement monitoring and logging.
-
Rich diagnostic API that allows to investigate internal state.
-
Rich configuration management - configuration of the bucket can be changed on fly
1.3. Bucket4j distributed features
In additional to basic features described above, Bucket4j
provides ability to implement rate-limiting in cluster of JVMs:
-
Bucket4j out of the box supports any GRID solution which compatible with JCache API (JSR 107) specification.
-
Bucket4j provides the framework that allows to quickly build integration with your own persistent technology like RDMS or a key-value storage.
-
For clustered usage scenarios Bucket4j supports asynchronous API that extremely matters when going to distribute world, because asynchronous API allows avoiding blocking your application threads each time when you need to execute Network request.
2. Basic functionality
2.1. Quick start examples
2.1.1. How to dependency to Bucket4j
The Bucket4j is distributed through Maven Central. You need to add dependency to your project as described bellow in order to be able to compile and run examples
<dependency>
<groupId>com.github.vladimir-bukhtoyarov</groupId>
<artifactId>bucket4j-core</artifactId>
<version>7.0.0</version>
</dependency>
implementation 'com.github.vladimir-bukhtoyarov:bucket4j-core:7.0.0'
2.1.2. Create your first Bucket, limiting the rate of heavy work
Imagine that you have a thread-pool executor and you want to know what your threads are doing in the moment when thread-pool throws RejectedExecutionException. Printing stacktraces of all threads in the JVM will be the best way to know where are all threads have stuck and why thread-pool is overflown. But acquiring stacktraces is very cost operation by itself, and you want to do it not often than 1 time per 10 minutes:
// define the limit 1 time per 10 minute
Bandwidth limit = Bandwidth.simple(1, Duration.ofMinutes(10));
// construct the bucket
Bucket bucket = Bucket.builder().addLimit(limit).build();
...
try {
executor.execute(anyRunnable);
} catch (RejectedExecutionException e) {
// print stacktraces only if limit is not exceeded
if (bucket.tryConsume(1)) {
ThreadInfo[] stackTraces = ManagementFactory.getThreadMXBean().dumpAllThreads(true, true);
StacktraceUtils.print(stackTraces);
}
throw e;
}
2.1.3. Using bucket in blocking mode
Suppose you need to have the fresh exchange rate between dollars and euros. To get the rate you continuously poll the third-party provider, and by contract with provider you should poll not often than 100 times per 1 minute, else provider will block your ip:
// define the limit 100 times per 1 minute
Bandwidth limit = Bandwidth.simple(100, Duration.ofMinutes(1));
// construct the bucket
Bucket bucket = Bucket.builder().addLimit(limit).build();
...
volatile double exchangeRate;
...
// do polling in infinite loop
while (true) {
// Consume a token from the token bucket.
// If a token is not available this method will block until the refill adds one to the bucket.
bucket.asBlocking().consume(1);
exchangeRate = pollExchangeRate();
}
2.1.4. Limiting the rate of access to REST API
Imagine that you develop yet another social network and you want to provide REST API for third-party developers. To protect your system from overloading you want to introduce following limitation:
The bucket size is 50 calls (which cannot be exceeded at any given time), with a "refill rate" of 10 calls per second that continually increases tokens in the bucket. In other words. if client app averages 10 calls per second, it will never be throttled, and moreover client have overdraft equals to 50 calls which can be used if average is little bit higher that 10 call/sec on short time period.
Constructing the bucket to satisfy the requirements above is little bit more complicated that for previous examples, because we have deal with overdraft, but it is not rocket science:
import io.github.bucket4j.Bucket4j;
public class ThrottlingFilter implements javax.servlet.Filter {
private Bucket createNewBucket() {
long overdraft = 50;
Refill refill = Refill.greedy(10, Duration.ofSeconds(1));
Bandwidth limit = Bandwidth.classic(overdraft, refill);
return Bucket.builder().addLimit(limit).build();
}
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
HttpSession session = httpRequest.getSession(true);
String appKey = SecurityUtils.getThirdPartyAppKey();
Bucket bucket = (Bucket) session.getAttribute("throttler-" + appKey);
if (bucket == null) {
bucket = createNewBucket();
session.setAttribute("throttler-" + appKey, bucket);
}
// tryConsume returns false immediately if no tokens available with the bucket
if (bucket.tryConsume(1)) {
// the limit is not exceeded
filterChain.doFilter(servletRequest, servletResponse);
} else {
// limit is exceeded
HttpServletResponse httpResponse = (HttpServletResponse) servletResponse;
httpResponse.setContentType("text/plain");
httpResponse.setStatus(429);
httpResponse.getWriter().append("Too many requests");
}
}
}
If you want provide more information to end user about the state of bucket, then last fragment of code above can be rewritten in following way:
HttpServletResponse httpResponse = (HttpServletResponse) servletResponse;
ConsumptionProbe probe = bucket.tryConsumeAndReturnRemaining(1);
if (probe.isConsumed()) {
// the limit is not exceeded
httpResponse.setHeader("X-Rate-Limit-Remaining", "" + probe.getRemainingTokens());
filterChain.doFilter(servletRequest, servletResponse);
} else {
// limit is exceeded
HttpServletResponse httpResponse = (HttpServletResponse) servletResponse;
httpResponse.setStatus(429);
httpResponse.setHeader("X-Rate-Limit-Retry-After-Seconds", "" + TimeUnit.NANOSECONDS.toSeconds(probe.getNanosToWaitForRefill()));
httpResponse.setContentType("text/plain");
httpResponse.getWriter().append("Too many requests");
}
2.1.5. Example of multiple bandwidth
Imagine that you are developing load testing tool, in order to be ensure that testable system is able to dispatch 1000 requests per 1 minute. But you do not want to randomly kill the testable system by generation all 1000 events in one second instead of 1 minute. To solve problem you can construct following bucket:
static final long MAX_WAIT_NANOS = TimeUnit.HOURS.toNanos(1);
// ...
Bucket bucket = Bucket.builder()
// allows 1000 tokens per 1 minute
.addLimit(Bandwidth.simple(1000, Duration.ofMinutes(1)))
// but not often then 50 tokens per 1 second
.addLimit(Bandwidth.simple(50, Duration.ofSeconds(1)))
.build();
// ...
while (true) {
// Consume a token from the token bucket. If a token is not available this method will block until the refill adds one to the bucket.
if (bucket.tryConsume(1, MAX_WAIT_NANOS, BlockingStrategy.PARKING)) {
workloadExecutor.execute(new LoadTask());
};
}
2.1.6. Specifying initial amount of tokens
By default initial size of bucket equals to capacity. But sometimes, you may want to have lesser initial size, for example for case of cold start in order to prevent denial of service:
int initialTokens = 42;
Bandwidth limit = Bandwidth
.simple(1000, Duration.ofHours(1))
.withInitialTokens(initialTokens);
Bucket bucket = Bucket.builder()
.addLimit(limit)
.build();
2.1.7. Turning-off the refill greediness
When bandwidth created via Bandwidth#simple
method it does refill in greedy manner, because bandwidth tries to add the tokens to bucket as soon as possible.
For example bandwidth with refill "10 tokens per 1 second" will add 1 token per each 100 millisecond,
in other words refill will not wait 1 second to regenerate whole bunch of 10 tokens.
If greediness is undesired then you should explicitly choose non-greedy refill. For example the bandwidth bellow will refill 10 tokens per 1 second instead of 1 token per 100 milliseconds:
// When refill created via "intervally" factory method then greediness is turned-off.
Refill refill = Refill.intervally(10, Duration.ofSeconds(1));
Bandwidth bandwidth = Bandwidth.classic(600, refill);
Also it is possible to specify the time when first refill should happen. This option can be used to configure clear interval boundary i.e. start of second, minute, hour, day.
// imagine that wall clock is 16:20, and we need to schedule the first refill to 17:00
Instant firstRefillTime = ZonedDateTime.now()
.truncatedTo(ChronoUnit.HOURS)
.plus(1, ChronoUnit.HOURS)
.toInstant();
// see detailed explanation for useAdaptiveInitialTokens in the javadocs for 'intervallyAligned' method
boolean useAdaptiveInitialTokens = false;
Bandwidth.classic(400, Refill.intervallyAligned(400, Duration.ofHours(1), firstRefillTime, useAdaptiveInitialTokens));
2.1.8. Returning tokens back to bucket
The compensating transaction is one of obvious use case when you want to return tokens back to bucket:
Bucket wallet;
...
if (wallet.tryConsume(50)) { // get 50 cents from wallet
try {
buyCocaCola();
} catch(NoCocaColaException e) {
// return money to wallet
wallet.addTokens(50);
}
}
2.1.9. Customizing time measurement - choosing nanotime time resolution
By default Bucket4j uses millisecond time resolution, it is preferred time measurement strategy. But rarely(for example benchmarking) you wish the nanosecond precision:
Bucket.builder().withNanosecondPrecision()
Be very careful to choose this time measurement strategy, because System.nanoTime()
produces inaccurate results,
use this strategy only if period of bandwidth is too small that millisecond resolution will be undesired.
2.1.10. Customizing time measurement - Specify custom time measurement strategy
You can specify your custom time meter, if existing miliseconds or nanotime time meters is not enough for your purposes. Imagine that you have a clock, which synchronizes its time with other machines in current cluster, if you want to use time provided by this clock instead of time provided by JVM then you can write something like this:
public class ClusteredTimeMeter implements TimeMeter {
@Override
public long currentTimeNanos() {
return ClusteredClock.currentTimeMillis() * 1_000_000;
}
}
Bandwidth limit = Bandwidth.simple(100, Duration.ofMinutes(1));
Bucket bucket = Bucket.builder()
.withCustomTimePrecision(new ClusteredTimeMeter())
.addLimit(limit)
.build();
2.2. Concepts
2.2.1. Bucket
Bucket
is rate-limiter that is implemented on the top of ideas of well-known Token Bucket algorithm.
In the Bucket4j library code the Bucket
is represented by interface io.github.bucket4j.Bucket.
-
BucketConfiguration specifies an immutable collection of limitation rules that is used by bucket during its work.
-
BucketState the place where bucket stores mutable state like amount of current available tokens.
Bucket can be constructed via special builder API BucketBuilder that is available by factory method:
Bucket bucket = Bucket.builder()
.addLimit(...)
.build();
2.2.2. BucketConfiguration
BucketConfiguration
can be described as collection of limits that are used by Bucket during its job. Configuration
In the Bucket4j library code the BucketConfiguration
is represented by class io.github.bucket4j.BucketConfiguration. Configuration is immutable, there is no way to add or remove a limit to already created configuration. However, you can replace configuration of bucket via creating new configuration instance and calling bucket.replaceConfiguration(newConfiguration)
.
Usually you should not create BucketConfiguration directly(excepting the case with configuration replacement) because BucketBuilder does for you behind the scene, for rare cases when you need to create configuration directly you have to use ConfigurationBuilder
that is available by factory method:
BucketConfiguration configuration = BucketConfiguration.builder()
.addLimit(...)
.build()
Important
|
Mostly users configure single limit per configuration, but it is strongly recommended analyzing whether short-timed bursts problem can affect your application and if so then thinking about to adding more limits. |
2.2.3. Limitation/Bandwidth
Limitations that are used by bucket can be denoted in terms of bandwidths. Bandwidth is denoted by following terms:
- Capacity
-
Capacity is the term that directly inherited from classic interpretation of token-bucket algorithm, this specifies how many tokens your bucket has.
- Refill
-
Refill specifies how fast tokens can be refilled after it were consumed from bucket.
- Initial tokens
-
Bucket4j extend token-bucket algorithm by allowing to specify initial amount of tokens for each bandwidth. By default, initial amount of tokens equals to capacity, and can be changed by
withInitialTokens
method:Bandwidth bandwidth = Bandwidth.simple(42, Duration.ofMinutes(1)) .withInitialTokens(13);
- ID
-
Identifier is the optional attribute that is null by default. You may prefer to assign identifiers for bandwidths if you use on the fly configuration replacement and your buckets have more than one bandwidth per bucket, otherwise it is better to avoid using identifiers in order to preserve memory. Identifier for a bandwidth can be specified by
withId
method:BucketConfiguration configuration = BucketConfiguration.builder() .addLimit(Bandwidth.simple(1000, Duration.ofMinutes(1)).withId("business-limit")) .addLimit(Bandwidth.simple(100, Duration.ofSeconds(1)).withId("burst-protection")) .build();
NoteIdentifiers are critical for on the fly configuration replacement functionality because during replacement it needs to make decision about how correctly propagate information about already consumed tokens from state before config replacement to state after replacement. This is not trivial task especially when amount of limits is changing.
2.2.4. Refill
Specifies the speed of tokens regeneration.
- Greedy
-
This type of refill regenerates tokens in greedy manner, it tries to add the tokens to bucket as soon as possible. For example refill "10 tokens per 1 second" adds 1 token per each 100 millisecond, in other words refill will not wait 1 second to regenerate whole bunch of 10 tokens. The three refills bellow do refill of tokens with same speed:
Refill.greedy(600, Duration.ofMinutes(1)); Refill.greedy(10, Duration.ofSeconds(1)); Refill.greedy(1, Duration.ofMillis(100));
Greedy
is default type of refill that is used when you createsimple
bandwidth// the two lines of code bellow are fully equivalent Bandwidth.simple(100, Duration.ofMinutes(1)) Bandwidth.classic(100, Refill.greedy(100, Duration.ofMinutes(1)))
- Intervally
-
This type of refill regenerates tokens in intervally manner. "Intervally" in opposite to "greedy" will wait until whole period will be elapsed before regenerate the whole amount of tokens.
Example:// generates 100 tokens each minute Refill.greedy(100, Duration.ofMinutes(1));
- IntervallyAligned
-
This type of refill regenerates that does refill of tokens in intervally manner. Intervally" in opposite to "greedy" will wait until whole period will be elapsed before regenerate the whole amount of tokens. In additional to Intervally it is possible to specify the time when first refill should happen. This type can be used to configure clear interval boundary i.e. start of second, minute, hour, day. To get more details reed javadocs for
Refill#intervallyAligned
method.Example:// imagine that wall clock is 16:20, the first refill will happen at 17:00 // first refill will happen in the beginning of next hour Instant firstRefillTime = ZonedDateTime.now() .truncatedTo(ChronoUnit.HOURS) .plus(1, ChronoUnit.HOURS) .toInstant(); Bandwidth.classic(400, Refill.intervallyAligned(400, Duration.ofHours(1), firstRefillTime, true));
2.2.5. BucketState
BucketState is the place where bucket stores own mutable state like:
-
Amount of current available tokens.
-
Timestamp when the last refill was happen.
BucketState
is represented by interface io.github.bucket4j.BucketState. Usually you never interact with this interface, excepting the cases when you want to get access to low-level diagnostic API that is described in
2.2.6. BucketBuilder
It was explicitly decided by library authors to not provide for end users to construct a library entity via direct constructors.
-
To be able in the future to change internal implementations without breaking backward compatibility.
-
In order to provide
Fluent Builder API
that in our minds is good modern library design pattern.
LocalBucketBuilder
is a fluent builder that is specialized to construct the local buckets, where local bucket is the bucket that holds internal state just in memory and does not provide clustering functionality. Bellow an example of LocalBucketBuilder usage:
Bucket bucket = Bucket.builder()
.addLimit(Bandwidth.simple())
.withNanosecondPrecision()
.withSynchronizationStrategy(SynchronizationStrategy.LOCK_FREE)
.build()
2.3. Technical limitations
In order to provide the best precision, Bucket4j uses integer arithmetic as much as possible, so any internal calculation is limited by bound Long.MAX_VALUE
. Library introduces several limits that described further, in order to be sure that calculations will never exceed the bound.
2.3.1. Maximum refill rate
Maximum refill rate is limited by 1 token/ 1 nanosecond
. Following examples of API usage will raise exceptions
Bandwidth.simple(2, Duration.ofNanos(1));
Bandwidth.simple(1001, Duration.ofNanos(1000));
Bandwidth.simple(1_000_001, Duration.ofMillis(1));
2.3.2. Limitation for refill period
Bucket4j works with time intervals as the 64-bit number of nanoseconds. So maximum refill period that is possible will be:
Duration.ofNanos(Long.MAX_VALUE);
Any attempt to specify period longer that limit above will fail with exception. For example the code bellow will failed
Bandwidth.simple(42, Duration.ofMinutes(153722867280912930));
Exception in thread "main" java.lang.ArithmeticException: long overflow
at java.lang.Math.multiplyExact(Math.java:892)
at java.time.Duration.toNanos(Duration.java:1186)
at io.github.bucket4j.Refill.<init>(Refill.java:48)
at io.github.bucket4j.Refill.greedy(Refill.java:100)
at io.github.bucket4j.Bandwidth.simple(Bandwidth.java:102)
2.4. Basic API Reference
2.4.1. io.github.bucket4j.Bucket
tryConsume
/**
* Tries to consume a specified number of tokens from this bucket.
*
* @param numTokens The number of tokens to consume from the bucket, must be a positive number.
*
* @return {@code true} if the tokens were consumed, {@code false} otherwise.
*/
boolean tryConsume(long numTokens);
consumeIgnoringRateLimits
/**
* Consumes {@code tokens} from bucket ignoring all limits.
* In result of this operation amount of tokens in the bucket could became negative.
*
* There are two possible reasons to use this method:
* <ul>
* <li>An operation with high priority should be executed independently of rate limits, but it should take effect to subsequent operation with bucket.</li>
* <li>You want to apply custom blocking strategy instead of default which applied on {@code asScheduler().consume(tokens)} </li>
* </ul>
*
* @param tokens amount of tokens that should be consumed from bucket.
*
* @return
* the amount of rate limit violation in nanoseconds calculated in following way:
* <ul>
* <li><tt>zero</tt> if rate limit was not violated. For example bucket had 5 tokens before invocation of {@code consumeIgnoringRateLimits(2)},
* after invocation there are 3 tokens remain in the bucket, since limits were not violated <tt>zero</tt> returned as result.</li>
* <li>Positive value which describes the amount of rate limit violation in nanoseconds.
* For example bucket with limit 10 tokens per 1 second, currently has the 2 tokens available, last refill happen 100 milliseconds ago, and {@code consumeIgnoringRateLimits(6)} called.
* <tt>300_000_000</tt> will be returned as result and available tokens in the bucket will became <tt>-3</tt>, and any variation of {@code tryConsume...} will not be successful for 400 milliseconds(time required to refill amount of available tokens until 1).
* </li>
* </ul>
*/
long consumeIgnoringRateLimits(long tokens);
tryConsumeAndReturnRemaining
/**
* Tries to consume a specified number of tokens from this bucket.
*
* @param numTokens The number of tokens to consume from the bucket, must be a positive number.
*
* @return {@link ConsumptionProbe} which describes both result of consumption and tokens remaining in the bucket after consumption.
*/
ConsumptionProbe tryConsumeAndReturnRemaining(long numTokens);
estimateAbilityToConsume
/**
* Estimates ability to consume a specified number of tokens.
*
* @param numTokens The number of tokens to consume, must be a positive number.
*
* @return {@link EstimationProbe} which describes the ability to consume.
*/
EstimationProbe estimateAbilityToConsume(long numTokens);
tryConsumeAsMuchAsPossible
/**
* Tries to consume as much tokens from this bucket as available at the moment of invocation.
*
* @return number of tokens which has been consumed, or zero if was consumed nothing.
*/
long tryConsumeAsMuchAsPossible();
/**
* Tries to consume as much tokens from bucket as available in the bucket at the moment of invocation,
* but tokens which should be consumed is limited by {@code limit}.
*
* @param limit maximum number of tokens to consume, should be positive.
*
* @return number of tokens which has been consumed, or zero if was consumed nothing.
*/
long tryConsumeAsMuchAsPossible(long limit);
addTokens
/**
* Add <tt>tokensToAdd</tt> to bucket.
* Resulted count of tokens are calculated by following formula:
* <pre>newTokens = Math.min(capacity, currentTokens + tokensToAdd)</pre>
* in other words resulted number of tokens never exceeds capacity independent of <tt>tokensToAdd</tt>.
*
* <h3>Example of usage</h3>
* The "compensating transaction" is one of obvious use case, when any piece of code consumed tokens from bucket, tried to do something and failed, the "addTokens" will be helpful to return tokens back to bucket:
* <pre>{@code
* Bucket wallet;
* ...
* if(wallet.tryConsume(50)) {// get 50 cents from wallet
* try {
* buyCocaCola();
* } catch(NoCocaColaException e) {
* // return money to wallet
* wallet.addTokens(50);
* }
* };
* }</pre>
*
* @param tokensToAdd number of tokens to add
*/
void addTokens(long tokensToAdd);
forceAddTokens
/**
* Add <tt>tokensToAdd</tt> to bucket. In opposite to {@link #addTokens(long)} usage of this method can lead to overflow bucket capacity.
*
* <h3>Example of usage</h3>
* The "compensating transaction" is one of obvious use case, when any piece of code consumed tokens from bucket, tried to do something and failed, the "addTokens" will be helpful to return tokens back to bucket:
* <pre>{@code
* Bucket wallet;
* ...
* if(wallet.tryConsume(50)) {// get 50 cents from wallet
* try {
* buyCocaCola();
* } catch(NoCocaColaException e) {
* // return money to wallet
* wallet.addTokens(50);
* }
* };
* }</pre>
*
* @param tokensToAdd number of tokens to add
*/
void forceAddTokens(long tokensToAdd);
getAvailableTokens
/**
* Returns amount of available tokens in this bucket.
* <p>
* Typically you should avoid using of this method for, because available tokens can be changed by concurrent transactions for case of multithreaded/multi-process environment.
*
* @return amount of available tokens
*/
long getAvailableTokens();
builder
/**
* Creates the new builder of in-memory buckets.
*
* @return new instance of {@link LocalBucketBuilder}
*/
static LocalBucketBuilder builder() {
return new LocalBucketBuilder();
}
replaceConfiguration
/**
* Replaces configuration of this bucket.
*
* <p>
* The first hard problem of configuration replacement is making decision how to propagate available tokens from bucket with previous configuration to bucket with new configuration.
* If you don't care about previous bucket state then use {@link TokensInheritanceStrategy#RESET}.
* But it becomes to a tricky problem when we expect that previous consumption(that has not been compensated by refill yet) should take effect to the bucket with new configuration.
* In this case you need to make a choice between {@link TokensInheritanceStrategy#PROPORTIONALLY} and {@link TokensInheritanceStrategy#AS_IS}, read documentation about both with strong attention.
*
* <p> There is another problem when you are choosing {@link TokensInheritanceStrategy#PROPORTIONALLY} and {@link TokensInheritanceStrategy#AS_IS} and bucket has more then one bandwidth.
* For example how does replaceConfiguration implementation should bind bandwidths to each other in the following example?
* <pre>
* <code>
* Bucket bucket = Bucket.builder()
* .addLimit(Bandwidth.simple(10, Duration.ofSeconds(1)))
* .addLimit(Bandwidth.simple(10000, Duration.ofHours(1)))
* .build();
* ...
* BucketConfiguration newConfiguration = BucketConfiguration.builder()
* .addLimit(Bandwidth.simple(5000, Duration.ofHours(1)))
* .addLimit(Bandwidth.simple(100, Duration.ofSeconds(10)))
* .build();
* bucket.replaceConfiguration(newConfiguration, TokensInheritanceStrategy.AS_IS);
* </code>
* </pre>
* It is obviously that simple strategy - copying tokens by bandwidth index will not work well in this case, because of it highly depends from order.
* Instead of inventing the backward maggic Bucket4j provides to you ability to deap controll of this process by specifying identifiers for bandwidth,
* so in case of multiple bandwidth configuratoin replacement code can copy available tokens by bandwidth ID. So it is better to rewrite code above as following:
* <pre>
* <code>
* Bucket bucket = Bucket.builder()
* .addLimit(Bandwidth.simple(10, Duration.ofSeconds(1)).withId("technical-limit"))
* .addLimit(Bandwidth.simple(10000, Duration.ofHours(1)).withId("business-limit"))
* .build();
* ...
* BucketConfiguration newConfiguration = BucketConfiguration.builder()
* .addLimit(Bandwidth.simple(5000, Duration.ofHours(1)).withId("business-limit"))
* .addLimit(Bandwidth.simple(100, Duration.ofSeconds(10)).withId("technical-limit"))
* .build();
* bucket.replaceConfiguration(newConfiguration, TokensInheritanceStrategy.AS_IS);
* </code>
* </pre>
*
*
* <p>
* There are following rules for bandwidth identifiers:
* <ul>
* <li>
* By default bandwidth has <b>null</b> identifier.
* </li>
* <li>
* null value of identifier equals to another null value if and only if there is only one bandwidth with null identifier.
* </li>
* <li>
* If identifier for bandwidth is specified then it must has unique in the bucket. Bucket does not allow to create several bandwidth with same ID.
* </li>
* <li>
* {@link TokensInheritanceStrategy#RESET} strategy will be applied for tokens migration during config replacement for bandwidth which has no bound bandwidth with same ID in previous configuration,
* idependently of strategy that was requested.
* </li>
* </ul>
*
* @param newConfiguration the new configuration
* @param tokensInheritanceStrategy specifies the rules for inheritance of available tokens
*/
void replaceConfiguration(BucketConfiguration newConfiguration, TokensInheritanceStrategy tokensInheritanceStrategy);
See configuration replacement section for more details.
asBlocking
/**
* Returns the blocking API for this bucket, that provides operations which are able to block caller thread in case of lack of tokens.
*
* @return the blocking API for this bucket.
*
* @see BlockingBucket
*/
BlockingBucket asBlocking();
See BlockingBucket section for more details.
asScheduler
/**
* Returns the scheduling API for this bucket, that provides operations which can delay user operation via {@link java.util.concurrent.ScheduledExecutorService} in case of lack of tokens.
*
* @return the scheduling API for this bucket.
*
* @see SchedulingBucket
*/
SchedulingBucket asScheduler();
See SchedulingBucket section for more details.
asVerbose
/**
* Returns the verbose API for this bucket.
*
* @return the verbose API for this bucket.
*/
VerboseBucket asVerbose();
See Verbose API section for more details.
toListenable
/**
* Returns new copy of this bucket instance decorated by {@code listener}.
* The created bucket will share same tokens with source bucket and vice versa.
*
* See javadocs for {@link BucketListener} in order to understand semantic of listener.
*
* @param listener the listener of bucket events.
*
* @return new bucket instance decorated by {@code listener}
*/
Bucket toListenable(BucketListener listener);
See Listening bucket events section for more details.
2.4.2. io.github.bucket4j.BlockingBucket
tryConsume
/**
* Tries to consume a specified number of tokens from the bucket.
*
* <p>
* The algorithm is following:
* <ul>
* <li>If bucket has enough tokens, then tokens consumed and <tt>true</tt> returned immediately.</li>
* <li>If bucket has no enough tokens,
* and required amount of tokens can not be refilled,
* even after waiting of <code>maxWaitTimeNanos</code> nanoseconds,
* then consumes nothing and returns <tt>false</tt> immediately.
* </li>
* <li>
* If bucket has no enough tokens,
* but deficit can be closed in period of time less then <code>maxWaitTimeNanos</code> nanoseconds,
* then tokens consumed(reserved in fair manner) from bucket and current thread blocked for a time required to close deficit,
* after unblocking method returns <tt>true</tt>.
*
* <p>
* <strong>Note:</strong> If InterruptedException happen when thread was blocked
* then tokens will be not returned back to bucket,
* but you can use {@link Bucket#addTokens(long)} to returned tokens back.
* </li>
* </ul>
*
* @param numTokens The number of tokens to consume from the bucket.
* @param maxWaitTimeNanos limit of time(in nanoseconds) which thread can wait.
* @param blockingStrategy specifies the way to block current thread to amount of time required to refill missed number of tokens in the bucket
*
* @return true if {@code numTokens} has been consumed or false when {@code numTokens} has not been consumed
*
* @throws InterruptedException in case of current thread has been interrupted during the waiting
*/
boolean tryConsume(long numTokens, long maxWaitTimeNanos, BlockingStrategy blockingStrategy) throws InterruptedException;
/**
* This is just overloaded equivalent of {@link #tryConsume(long, long, BlockingStrategy)}
*
* @see #tryConsume(long, long, BlockingStrategy)
*
* @param numTokens The number of tokens to consume from the bucket.
* @param maxWait limit of time which thread can wait.
* @param blockingStrategy specifies the way to block current thread to amount of time required to refill missed number of tokens in the bucket
*
* @return true if {@code numTokens} has been consumed or false when {@code numTokens} has not been consumed
*
* @throws InterruptedException in case of current thread has been interrupted during the waiting
*/
default boolean tryConsume(long numTokens, Duration maxWait, BlockingStrategy blockingStrategy) throws InterruptedException {
return tryConsume(numTokens, maxWait.toNanos(), blockingStrategy);
}
/**
* This is just overloaded equivalent of {@link #tryConsume(long, long, BlockingStrategy)}
*
* @see #tryConsume(long, long, BlockingStrategy)
*
* @param numTokens The number of tokens to consume from the bucket.
* @param maxWaitTimeNanos limit of time(in nanoseconds) which thread can wait.
*
* @return true if {@code numTokens} has been consumed or false when {@code numTokens} has not been consumed
*
* @throws InterruptedException in case of current thread has been interrupted during the waiting
*/
default boolean tryConsume(long numTokens, long maxWaitTimeNanos) throws InterruptedException {
return tryConsume(numTokens, maxWaitTimeNanos, BlockingStrategy.PARKING);
}
/**
* This is just overloaded equivalent of {@link #tryConsume(long, long, BlockingStrategy)}
*
* @see #tryConsume(long, long, BlockingStrategy)
*
* @param numTokens The number of tokens to consume from the bucket.
* @param maxWait limit of time which thread can wait.
*
* @return true if {@code numTokens} has been consumed or false when {@code numTokens} has not been consumed
*
* @throws InterruptedException in case of current thread has been interrupted during the waiting
*/
default boolean tryConsume(long numTokens, Duration maxWait) throws InterruptedException {
return tryConsume(numTokens, maxWait.toNanos(), BlockingStrategy.PARKING);
}
tryConsumeUninterruptibly
/**
* Has same semantic with {@link #tryConsume(long, long, BlockingStrategy)} but ignores interrupts(just restores interruption flag on exit).
*
* @param numTokens The number of tokens to consume from the bucket.
* @param maxWaitTimeNanos limit of time(in nanoseconds) which thread can wait.
* @param blockingStrategy specifies the way to block current thread to amount of time required to refill missed number of tokens in the bucket
*
* @return true if {@code numTokens} has been consumed or false when {@code numTokens} has not been consumed
*
* @see #tryConsume(long, long, BlockingStrategy)
*/
boolean tryConsumeUninterruptibly(long numTokens, long maxWaitTimeNanos, UninterruptibleBlockingStrategy blockingStrategy);
/**
* This is just overloaded equivalent of {@link #tryConsumeUninterruptibly(long, long, UninterruptibleBlockingStrategy)}
*
* @param numTokens The number of tokens to consume from the bucket.
* @param maxWait limit of time which thread can wait.
* @param blockingStrategy specifies the way to block current thread to amount of time required to refill missed number of tokens in the bucket
*
* @return true if {@code numTokens} has been consumed or false when {@code numTokens} has not been consumed
*
* @see #tryConsumeUninterruptibly(long, long, UninterruptibleBlockingStrategy)
*/
default boolean tryConsumeUninterruptibly(long numTokens, Duration maxWait, UninterruptibleBlockingStrategy blockingStrategy) {
return tryConsumeUninterruptibly(numTokens, maxWait.toNanos(), blockingStrategy);
}
/**
* This is just overloaded equivalent of {@link #tryConsumeUninterruptibly(long, long, UninterruptibleBlockingStrategy)}
*
* @param numTokens The number of tokens to consume from the bucket.
* @param maxWaitTimeNanos limit of time(in nanoseconds) which thread can wait.
*
* @return true if {@code numTokens} has been consumed or false when {@code numTokens} has not been consumed
*
* @see #tryConsumeUninterruptibly(long, long, UninterruptibleBlockingStrategy)
*/
default boolean tryConsumeUninterruptibly(long numTokens, long maxWaitTimeNanos) {
return tryConsumeUninterruptibly(numTokens, maxWaitTimeNanos, UninterruptibleBlockingStrategy.PARKING);
}
/**
* This is just overloaded equivalent of {@link #tryConsumeUninterruptibly(long, long, UninterruptibleBlockingStrategy)}
*
* @param numTokens The number of tokens to consume from the bucket.
* @param maxWait limit of time which thread can wait.
*
* @return true if {@code numTokens} has been consumed or false when {@code numTokens} has not been consumed
*
* @see #tryConsumeUninterruptibly(long, long, UninterruptibleBlockingStrategy)
*/
default boolean tryConsumeUninterruptibly(long numTokens, Duration maxWait) {
return tryConsumeUninterruptibly(numTokens, maxWait.toNanos(), UninterruptibleBlockingStrategy.PARKING);
}
consume
/**
* Consumes a specified number of tokens from the bucket.
*
* <p>
* The algorithm is following:
* <ul>
* <li>If bucket has enough tokens, then tokens consumed and method returns immediately.</li>
* <li>
* If bucket has no enough tokens, then required amount of tokens will be reserved for future consumption
* and current thread will be blocked for a time required to close deficit.
* </li>
* <li>
* <strong>Note:</strong> If InterruptedException happen when thread was blocked
* then tokens will be not returned back to bucket,
* but you can use {@link Bucket#addTokens(long)} to returned tokens back.
* </li>
* </ul>
*
* @param numTokens The number of tokens to consume from the bucket.
* @param blockingStrategy specifies the way to block current thread to amount of time required to refill missed number of tokens in the bucket
*
*
* @throws InterruptedException in case of current thread has been interrupted during the waiting
*/
void consume(long numTokens, BlockingStrategy blockingStrategy) throws InterruptedException;
/**
* This is just overloaded equivalent of {@link #consume(long, BlockingStrategy)}
*
* @param numTokens The number of tokens to consume from the bucket.
*
* @throws InterruptedException in case of current thread has been interrupted during the waiting
*
* @see #consume(long, BlockingStrategy)
*/
default void consume(long numTokens) throws InterruptedException {
consume(numTokens, BlockingStrategy.PARKING);
}
consumeUninterruptibly
/**
* Has same semantic with {@link #consume(long, BlockingStrategy)} but ignores interrupts(just restores interruption flag on exit).
*
* @param numTokens The number of tokens to consume from the bucket.
* @param blockingStrategy specifies the way to block current thread to amount of time required to refill missed number of tokens in the bucket
*
* @see #consume(long, BlockingStrategy)
*/
void consumeUninterruptibly(long numTokens, UninterruptibleBlockingStrategy blockingStrategy);
/**
* This is just overloaded equivalent of {@link #consumeUninterruptibly(long, UninterruptibleBlockingStrategy)}
*
* @param numTokens The number of tokens to consume from the bucket.
*
* @see #consumeUninterruptibly(long, UninterruptibleBlockingStrategy)
*/
default void consumeUninterruptibly(long numTokens) {
consumeUninterruptibly(numTokens, UninterruptibleBlockingStrategy.PARKING);
}
2.4.3. io.github.bucket4j.SchedulingBucket
tryConsume
/**
* Tries to consume the specified number of tokens from the bucket.
*
* <p>
* <strong>The algorithm for all type of buckets is following:</strong>
* <ul>
* <li>Implementation issues asynchronous request to back-end behind the bucket(for local bucket it is just a synchronous call) in way which specific for each particular back-end.</li>
* <li>Then uncompleted future returned to the caller.</li>
* <li>If back-end provides signal(through callback) that asynchronous request failed, then future completed exceptionally.</li>
* <li>When back-end provides signal(through callback) that request is done(for local bucket response got immediately), then following post-processing rules will be applied:
* <ul>
* <li>
* If tokens were consumed then future immediately completed by <tt>true</tt>.
* </li>
* <li>
* If tokens were not consumed because were not enough tokens in the bucket and <tt>maxWaitNanos</tt> nanoseconds is not enough time to refill deficit,
* then future immediately completed by <tt>false</tt>.
* </li>
* <li>
* If tokens were reserved(effectively consumed) then <tt>task</tt> to delayed completion will be scheduled to the <tt>scheduler</tt> via {@link ScheduledExecutorService#schedule(Runnable, long, TimeUnit)},
* when delay equals to time required to refill the deficit of tokens. After scheduler executes task the future completed by <tt>true</tt>.
* </li>
* </ul>
* </li>
* </ul>
* It is strongly not recommended to do any heavy work in thread which completes the future,
* because typically this will be a back-end thread which handles NIO selectors,
* blocking this thread will take negative performance effect to back-end throughput,
* so you always should resume control flow in another executor via methods like {@link CompletableFuture#thenApplyAsync(Function, Executor)}.
*
* @param numTokens The number of tokens to consume from the bucket.
* @param maxWaitNanos limit of time(in nanoseconds) which thread can wait.
* @param scheduler used to delayed future completion
*/
CompletableFuture<Boolean> tryConsume(long numTokens, long maxWaitNanos, ScheduledExecutorService scheduler);
/**
* This is just overloaded equivalent of {@link #tryConsume(long, long, ScheduledExecutorService)}
*
* @param numTokens The number of tokens to consume from the bucket.
* @param maxWait limit of time which thread can wait.
* @param scheduler used to delayed future completion
*
* @see #tryConsume(long, long, ScheduledExecutorService)
*/
default CompletableFuture<Boolean> tryConsume(long numTokens, Duration maxWait, ScheduledExecutorService scheduler) {
return tryConsume(numTokens, maxWait.toNanos(), scheduler);
}
consume
/**
* Consumes the specified number of tokens from the bucket.
*
* <p>
* <strong>The algorithm for all type of buckets is following:</strong>
* <ul>
* <li>Implementation issues asynchronous request to back-end behind the bucket(for local bucket it is just a synchronous call) in way which specific for each particular back-end.</li>
* <li>Then uncompleted future returned to the caller.</li>
* <li>If back-end provides signal(through callback) that asynchronous request failed, then future completed exceptionally.</li>
* <li>When back-end provides signal(through callback) that request is done(for local bucket response got immediately), then following post-processing rules will be applied:
* <ul>
* <li>
* If tokens were consumed then future immediately completed.
* </li>
* <li>
* Else tokens reserved(effectively consumed) and <tt>task</tt> to delayed completion will be scheduled to the <tt>scheduler</tt> via {@link ScheduledExecutorService#schedule(Runnable, long, TimeUnit)},
* when delay equals to time required to refill the deficit of tokens. After scheduler executes task the future completed.
* </li>
* </ul>
* </li>
* </ul>
* It is strongly not recommended to do any heavy work in thread which completes the future,
* because typically this will be a back-end thread which handles NIO selectors,
* blocking this thread will take negative performance effect to back-end throughput,
* so you always should resume control flow in another executor via methods like {@link CompletableFuture#thenApplyAsync(Function, Executor)}.
*
* @param numTokens The number of tokens to consume from the bucket.
* @param scheduler used to delayed future completion
*
*/
CompletableFuture<Void> consume(long numTokens, ScheduledExecutorService scheduler);
2.5. Generic production checklist
The considerations described bellow are applicable to each solution based on the token-bucket or leaky-bucket algorithm. You need to understand, agree and configure following points:
2.5.1. Be wary of long periods
When you are planning to use any solution based on token-bucket for throttling incoming requests, you need to pay close attention to the throttling time window.
-
Given a bucket with a limit of 10000 tokens/ per 1 hour per user.
-
A malicious attacker may send 9999 request in very short period, for example within 10 seconds. This would correspond to 100 request per second which could seriously impact your system.
-
A skilled attacker could stop at 9999 request per hour, and repeat every hour, which would make this attack impossible to detect (because the limit would not be reached).
To protect from this kind attacks, you should specify multiple limits like bellow
Bucket bucket = Bucket.builder()
.addLimit(Bandwidth.simple(10000, Duration.ofSeconds(3_600))
.addLimit(Bandwidth.simple(20, Duration.ofSeconds(1)) // attacker is unable to achieve 1000RPS and crash service in short time
.build();
The number of limits specified per bucket does not impact the performance.
2.5.2. Be wary of short-timed bursts
Token bucket is an efficient algorithm with low and fixed memory footprint, independently of the incoming request-rate(it can be millions per second) the bucket consumes no more then 40 bytes(five longs). But an efficient memory footprint has its own cost - bandwidth limitation is only satisfied over a long period of time. In other words you cannot avoid short-timed bursts.
-
Given a bucket with a limit of 100 tokens/min. We start with a full bucket, i.e. with 100 tokens.
-
At
T1
100 requests are made and thus the bucket becomes empty. -
At
T1+1min
the bucket is full again because tokens fully regenerated and we can immediately consume 100 tokens. -
This means that between
T1
andT1+1min
we have consumed 200 tokens. Over a long period of time there will be no more than 100 requests per min, but as shown above it is possible to burst at twice the limit here at 100 tokens per min.
-
Do not use Bucket4j or any other solution implemented on top of token-bucket algorithms, because token-bucket is specially designed for network traffic management devices for which short-living traffic spike is a regular case, trying to avoid spike at all contradicts with the nature of token-bucket.
-
Since the value of burst always equals to capacity, try to reduce the capacity and speed of refill. For example if you have strong requirements
100tokens/60seconds
then configure bucket ascapacity=50tokens refill=50tokens/60seconds
. It worth to mention that this way leads to following drawbacks: — In one time you are not allowed to consume amount of tokens greater than capacity, according to example above - before capacity reducing you was able to consume 100 tokens in single request, after reducing you are able to consume 50 tokens in one request at max. — Reducing the speed of refill leads to underconsumptions on long term periods, it is obvious that with refill50tokens/60seconds
you will be able to consume 3050 tokens for 1 hour, instead of 6100(as was prior refill reducing). — As a summary of two drawbacks above, we can say that you will pay via underconsumption for eliminating the risk of overconsumption.
3. Distributed facilities
3.1. JCache integration
Bucket4j
supports any GRID solution which compatible with JCache API (JSR 107) specification.
Note
|
Do not forget to read Distributed usage checklist before using the Bucket4j over JCache cluster. |
To use JCache extension you also need to add following dependency:
<dependency>
<groupId>com.github.vladimir-bukhtoyarov</groupId>
<artifactId>bucket4j-jcache</artifactId>
<version>7.0.0</version>
</dependency>
JCache expects javax.cache.cache-api to be a provided dependency. Do not forget to add following dependency:
<dependency>
<groupId>javax.cache</groupId>
<artifactId>cache-api</artifactId>
<version>${jcache.version}</version>
</dependency>
3.1.1. Example 1 - limiting access to HTTP server by IP address
Imagine that you develop any Servlet based WEB application and want to limit access per IP basis. You want to use same limits for each IP - 30 requests per minute.
ServletFilter would be obvious place to check limits:
public class IpThrottlingFilter implements javax.servlet.Filter {
private static final BucketConfiguration configuration = BucketConfiguration.builder()
.addLimit(Bandwidth.simple(30, Duration.ofMinutes(1)))
.build();
// cache for storing token buckets, where IP is key.
@Inject
private javax.cache.Cache<String, byte[]> cache;
private ProxyManager<String> buckets;
@Override
public void init(FilterConfig filterConfig) throws ServletException {
// init bucket registry
buckets = new JCacheProxyManager<>(cache);
}
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
String ip = IpHelper.getIpFromRequest(httpRequest);
// acquire cheap proxy to bucket
Bucket bucket = proxyManager.builder().build(key, configuration);
// tryConsume returns false immediately if no tokens available with the bucket
if (bucket.tryConsume(1)) {
// the limit is not exceeded
filterChain.doFilter(servletRequest, servletResponse);
} else {
// limit is exceeded
HttpServletResponse httpResponse = (HttpServletResponse) servletResponse;
httpResponse.setContentType("text/plain");
httpResponse.setStatus(429);
httpResponse.getWriter().append("Too many requests");
}
}
}
3.1.2. Example 2 - limiting access to service by contract agreements
Imagine that you provide paid language translation service via HTTP. Each user has unique agreement which differs from each other. Details of each agreement is stored in relational database, and takes significant time to fetch(for example 100ms). The example above will not work fine in this case, because time to create/fetch configuration of bucket from database will be 100 times slower than limit-checking itself. Bucket4j solves this problem via lazy configuration suppliers which are called if and only if bucket was not yet stored in grid, thus it is possible to implement solution that will read agreement from database once per each user.
public class IpThrottlingFilter implements javax.servlet.Filter {
// service to provide per user limits
@Inject
private LimitProvider limitProvider;
// cache for storing token buckets, where IP is key.
@Inject
private javax.cache.Cache<String, byte[]> cache;
private ProxyManager<String> buckets;
@Override
public void init(FilterConfig filterConfig) throws ServletException {
// init bucket registry
buckets = new JCacheProxyManager<>(cache);
}
@Override
public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
String userId = AutentificationHelper.getUserIdFromRequest(httpRequest);
// prepare configuration supplier which will be called(on first interaction with proxy) if bucket was not saved yet previously.
Supplier<BucketConfiguration> configurationLazySupplier = getConfigSupplierForUser(userId);
// acquire cheap proxy to bucket
Bucket bucket = proxyManager.builder().build(key, configurationLazySupplier);
// tryConsume returns false immediately if no tokens available with the bucket
if (bucket.tryConsume(1)) {
// the limit is not exceeded
filterChain.doFilter(servletRequest, servletResponse);
} else {
// limit is exceeded
HttpServletResponse httpResponse = (HttpServletResponse) servletResponse;
httpResponse.setContentType("text/plain");
httpResponse.setStatus(429);
httpResponse.getWriter().append("Too many requests");
}
}
private Supplier<BucketConfiguration> getConfigSupplierForUser(String userId) {
return () -> {
long translationsPerDay = limitProvider.readPerDayLimitFromAgreementsDatabase(userId);
return BucketConfiguratiion.builder()
.addLimit(Bandwidth.simple(translationsPerDay, Duration.ofDays(1)))
.build();
};
}
}
3.1.3. Why JCache specification is not enough in modern stacks and since 3.0 were introduced the dedicated modules for Infinispan, Hazelcast, Coherence and Ignite?
Asynchronous processing is very important for high-throughput applications, but JCache specification does not specify asynchronous API, because two early attempts to bring this kind functionality at spec level 307, 312 were failed in absence of consensus.
Also, implementation the asynchronous support for any other JCache provider outside from the list above should be easy exercise, so feel free to return back the pull request addressed to cover your favorite JCache provider.
3.1.4. Verification of compatibility with particular JCache provider is your responsibility
Important
|
Keep in mind that there are many non-certified implementations of JCache specification on the market. Many of them want to increase their popularity by declaring support for the JCache API, but often only the API is supported and the semantic of JCache is totally ignored. Usage Bucket4j with this kind of libraries should be completely avoided. |
Bucket4j is only compatible with implementations which obey the JCache specification rules(especially related to EntryProcessor execution). Oracle Coherence, Apache Ignite, Hazelcast are good examples of safe implementations of JCache.
Important
|
Because it is impossible to test all possible JCache providers, you need to test your provider by yourself. |
Just run this code in order to be sure that your implementation of JCache provides good isolation for EntryProcessors
import javax.cache.Cache;
import javax.cache.processor.EntryProcessor;
import java.util.concurrent.CountDownLatch;
import java.io.Serializable;
public class CompatibilityTest {
final Cache<String, Integer> cache;
public CompatibilityTest(Cache<String, Integer> cache) {
this.cache = cache;
}
public void test() throws InterruptedException {
String key = "42";
int threads = 4;
int iterations = 1000;
cache.put(key, 0);
CountDownLatch latch = new CountDownLatch(threads);
for (int i = 0; i < threads; i++) {
new Thread(() -> {
try {
for (int j = 0; j < iterations; j++) {
EntryProcessor<String, Integer, Void> processor = (EntryProcessor<String, Integer, Void> & Serializable) (mutableEntry, objects) -> {
int value = mutableEntry.getValue();
mutableEntry.setValue(value + 1);
return null;
};
cache.invoke(key, processor);
}
} finally {
latch.countDown();
}
}).start();
}
latch.await();
int value = cache.get(key);
if (value == threads * iterations) {
System.out.println("Implementation which you use is compatible with Bucket4j");
} else {
String msg = "Implementation which you use is not compatible with Bucket4j";
msg += ", " + (threads * iterations - value) + " writes are missed";
throw new IllegalStateException(msg);
}
}
}
The check does 4000 increments of integer in parallel and verifies that no one update has been missed. If check passed then your JCache provider is compatible with Bucket4j, the throttling will work fine in distributed and concurrent environment. If check is not passed, then reach to the particular JCache provider team and consult why its implementation misses the writes.
3.2. Hazelcast integration
3.2.1. Dependencies
To use Bucket4j extension for Hazelcast with Hazelcast 4.x
you need to add following dependency:
<dependency>
<groupId>com.github.vladimir-bukhtoyarov</groupId>
<artifactId>bucket4j-hazelcast</artifactId>
<version>7.0.0</version>
</dependency>
If you are using legacy version of Hazelcast 3.x
then you need to add following dependency:
<dependency>
<groupId>com.github.vladimir-bukhtoyarov</groupId>
<artifactId>bucket4j-hazelcast-3</artifactId>
<version>7.0.0</version>
</dependency>
3.2.2. General compatibility matrix principles:
-
Bucket4j authors do not perform continues monitoring of new Hazelcast releases. So, there is can be case when there is no one version of Bucket4j which is compatible with newly released Hazelcast, just log issue to bug tracker in this case, adding support to new version of Hazelcast is usually easy exercise.
-
Integrations with legacy versions of Hazelcast are not removed without a clear reason. Hence You are in safety, even you are working in a big enterprise company that does not update its infrastructure frequently because You still get new Bucket4j’s features even for legacy Hazelcast’s releases.
3.2.3. Example of Bucket instantiation
IMap<K, byte[]> map = ...;
private static final HazelcastProxyManager<K> proxyManager = new HazelcastProxyManager(map);
...
BucketConfiguration configuration = BucketConfiguration.builder()
.addLimit(Bandwidth.simple(1_000, Duration.ofMinutes(1)))
.build(key, configuration);
Bucket bucket = proxyManager.builder().build(configuration);
3.2.4. Configuring Custom Serialization for Bucket4j library classes
If you configure nothing, then by default Java serialization will be used for serialization Bucket4j library classes. Java serialization can be rather slow and should be avoided in general.
Bucket4j
provides custom serializers for all library classes that could be transferred over network.
To let Hazelcast know about fast serializers you should register them programmatically in the serialization config:
import com.hazelcast.config.Config;
import com.hazelcast.config.SerializationConfig;
import com.hazelcast.config.SerializerConfig;
import io.github.bucket4j.grid.hazelcast.serialization.HazelcastSerializer;
...
Config config = ...
SerializationConfig serializationConfig = config.getSerializationConfig();
// the starting type ID number for Bucket4j classes.
// you free to choose any unused ID, but be aware that Bucket4j uses 2 types currently,
// and may use more types in the future, so leave enough empty space after baseTypeIdNumber
int baseTypeIdNumber = 10000;
HazelcastProxyManager.addCustomSerializers(serializationConfig, baseTypeIdNumber);
3.2.5. Known issues related with Docker and(or) SpringBoot
-
#186 HazelcastEntryProcessor class not found - check file permissions inside your image.
-
#182 HazelcastSerializationException with Hazelcast 4.2 - properly setup classloader for Hazelcast client configuration.
3.3. Apache Ignite integration
Before use bucket4j-ignite
module please read [bucket4j-jcache documentation](jcache-usage.md),
because bucket4j-ignite
is just a follow-up of bucket4j-jcache
.
Bucket4j supports Ignite Thin-Client as well as regular deployment scenario.
Question: Bucket4j already supports JCache since version 1.2
. Why it was needed to introduce direct support for Apache Ignite
?
Answer: Because JCache API (JSR 107) does not specify asynchronous API,
developing the dedicated module bucket4j-ignite
was the only way to provide asynchrony for users who use Bucket4j
and Apache Ignite
together.
Question: Should I migrate from bucket4j-jcache
to bucketj-ignite
If I do not need in asynchronous API?
Answer: No, you should not migrate to bucketj-ignite
in this case.
3.3.1. Dependencies
To use bucket4j-ignite
extension you need to add following dependency:
<dependency>
<groupId>com.github.vladimir-bukhtoyarov</groupId>
<artifactId>bucket4j-ignite</artifactId>
<version>7.0.0</version>
</dependency>
3.3.2. Example of Bucket instantiation via IgniteProxyManager
org.apache.ignite.IgniteCache<K, byte[]> cache = ...;
private static final IgniteProxyManager proxyManager = new IgniteProxyManager(cache);
...
BucketConfiguration configuration = BucketConfiguration.builder()
.addLimit(Bandwidth.simple(1_000, Duration.ofMinutes(1)))
.build();
Bucket bucket = proxyManager.builder().build(key, configuration);
Important
|
Pay attention that IgniteProxyManager requires from all nodes in the cluster to contain bucket4j Jars in classpath. |
3.3.3. Example of Bucket instantiation via Thin Client
org.apache.ignite.client.ClientCache<K, byte[]> cache = ...;
org.apache.ignite.client.ClientCompute clientCompute = ...;
private static final IgniteThinClientProxyManager<K> proxyManager = new IgniteThinClientProxyManager(cache, clientCompute)
...
BucketConfiguration configuration = BucketConfiguration.builder()
.addLimit(Bandwidth.simple(1_000, Duration.ofMinutes(1)))
.build();
Bucket bucket = proxyManager.builder().build(key, configuration);
Important
|
Pay attention that IgniteThinClientProxyManager requires from all nodes in the cluster to contain bucket4j Jars in classpath. |
3.3.4. Example of Bucket instantiation of via Thin Client and IgniteThinClientCasBasedProxyManager
org.apache.ignite.client.ClientCache<K, byte[]> cache = ...;
private static final IgniteThinClientCasBasedProxyManager<K> proxyManager = new IgniteThinClientCasBasedProxyManager(cache)
...
BucketConfiguration configuration = BucketConfiguration.builder()
.addLimit(Bandwidth.simple(1_000, Duration.ofMinutes(1)))
.build();
Bucket bucket = proxyManager.builder().build(key, configuration);
Important
|
IgniteThinClientCasBasedProxyManager does not require from all nodes in the cluster to contain bucket4j Jars in classpath, but it operates with more latency, so choose it over IgniteThinClientProxyManager if and only if you have no control over cluster classpath. |
3.4. Infinispan integration
3.4.1. Dependencies
To use bucket4j-infinispan
with Infinispan 9.x, 10.x
extension you need to add following dependency:
<dependency>
<groupId>com.github.vladimir-bukhtoyarov</groupId>
<artifactId>bucket4j-infinispan</artifactId>
<version>7.0.0</version>
</dependency>
If you are using legacy version of Infinispan 8.x
then you need to add following dependency:
<dependency>
<groupId>com.github.vladimir-bukhtoyarov</groupId>
<artifactId>bucket4j-infinispan-8</artifactId>
<version>7.0.0</version>
</dependency>
3.4.2. General compatibility matrix principles::
-
Bucket4j authors do not perform continues monitoring of new Infinispan releases. So, there is can be case when there is no one version of Bucket4j which is compatible with newly released Infinispan, just log issue to bug tracker in this case, adding support to new version of Infinispan is usually easy exercise.
-
Integrations with legacy versions of Infinispan are not removed without a clear reason. Hence, you are in safety, even you are working in a big enterprise company that does not update its infrastructure frequently because You still get new Bucket4j’s features even for legacy Infinispan’s releases.
3.4.3. Special notes for Infinispan 10.0+
As mentioned in the Infinispan Marshalling documentation, since release 10.0.0
Infinispan does not allow deserialization of custom payloads into Java classes. If you do not configure serialization(as described bellow), you will get the error like this on any attempt to use Bucket4j with brand new Infinispan release:
Jan 02, 2020 4:57:56 PM org.infinispan.marshall.persistence.impl.PersistenceMarshallerImpl objectToBuffer
WARN: ISPN000559: Cannot marshall 'class io.github.bucket4j.grid.infinispan.InfinispanProcessor'
java.lang.IllegalArgumentException: No marshaller registered for Java type io.github.bucket4j.grid.infinispan.SerializableFunctionAdapter
at org.infinispan.protostream.impl.SerializationContextImpl.getMarshallerDelegate(SerializationContextImpl.java:279)
at org.infinispan.protostream.WrappedMessage.writeMessage(WrappedMessage.java:240)
at org.infinispan.protostream.ProtobufUtil.toWrappedStream(ProtobufUtil.java:196)
There are three options to solve this problem:
* Configure Jboss marshalling instead of default ProtoStream marshaller as described there.
* Configure Java Serialization Marshaller instead of default ProtoStream marshaller, as described there.
Do not forget to add io.github.bucket4j.*
regexp to the whitelist if choosing this way.
* And last way(recommended) just register Bucket4j serialization context initializer
in the serialization configuration.
You can do it in both programmatically and declarative ways:
import io.github.bucket4j.grid.infinispan.serialization.Bucket4jProtobufContextInitializer;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
...
GlobalConfigurationBuilder builder = new GlobalConfigurationBuilder();
builder.serialization().addContextInitializer(new Bucket4jProtobufContextInitializer());
<serialization>
<context-initializer class="io.github.bucket4j.grid.infinispan.serialization.Bucket4jProtobufContextInitializer"/>
</serialization>
And that is all. Just registering Bucket4jProtobufContextInitializer
in any way is enough to make Bucket4j compatible with ProtoStream marshaller, you do not have to care about *.proto
files, annotations, whitelist etc, all neccessary Protobuffers configs generated by Bucket4jProtobufContextInitializer
and registerd on the fly.
3.4.4. Example of Bucket instantiation
org.infinispan.functional.FunctionalMap.ReadWriteMap<K, byte[]> map = ...;
private static final InfinispanProxyManager<K> proxyManager = new InfinispanProxyManager(map);
...
BucketConfiguration configuration = BucketConfiguration.builder()
.addLimit(Bandwidth.simple(1_000, Duration.ofMinutes(1)))
.build(key, configuration);
Bucket bucket = proxyManager.builder().build(configuration);
3.5. Oracle Coherence integration
3.5.1. Dependencies
To use bucket4j-coherence
extension you need to add following dependency:
<dependency>
<groupId>com.github.vladimir-bukhtoyarov</groupId>
<artifactId>bucket4j-coherence</artifactId>
<version>7.0.0</version>
</dependency>
3.5.2. Example of Bucket instantiation
com.tangosol.net.NamedCache<K, byte[]> cache = ...;
private static final CoherenceProxyManager<K> proxyManager = new CoherenceProxyManager(map);
...
BucketConfiguration configuration = BucketConfiguration.builder()
.addLimit(Bandwidth.simple(1_000, Duration.ofMinutes(1)))
.build(key, configuration);
Bucket bucket = proxyManager.builder().build(configuration);
3.5.3. Configuring POF serialization for Bucket4j library classes
If you configure nothing, then by default Java serialization will be used for serialization Bucket4j library classes. Java serialization can be rather slow and should be avoided in general.
Bucket4j
provides custom POF serializers for all library classes that could be transferred over network.
To let Coherence know about POF serializers you should register three serializers in the POF configuration config file:
io.github.bucket4j.grid.coherence.pof.CoherenceEntryProcessorPofSerializer
for class io.github.bucket4j.grid.coherence.CoherenceProcessor
<pof-config xmlns="http://xmlns.oracle.com/coherence/coherence-pof-config"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-pof-config coherence-pof-config.xsd">
<user-type-list>
<!-- Include default Coherence types -->
<include>coherence-pof-config.xml</include>
<!-- Define serializers for Bucket4j classes -->
<user-type>
<type-id>1001</type-id>
<class-name>io.github.bucket4j.grid.coherence.CoherenceProcessor</class-name>
<serializer>
<class-name>io.github.bucket4j.grid.coherence.pof.CoherenceEntryProcessorPofSerializer</class-name>
</serializer>
</user-type>
</user-type-list>
</pof-config>
Double check with official Oracle Coherence documentation in case of any questions related to Portable Object Format
.
3.6. Asynchronous API
Since version 3.0
Bucket4j provides asynchronous analogs for majority of API methods.
Async view of proxyManager is available through asAsync()
method:
ProxyManager proxyManager = ...;
AsyncProxyManager asyncProxyManager = proxyManager.asAsync();
BucketConfiguration configuration = ...;
AsyncBucketProxy asyncBucket = asyncProxyManager.builder().build(key, configuration);
Each method of class
has full equivalence with same semantic in synchronous version in the AsyncBucketProxy
class.Bucket
3.6.1. Example - limiting the rate of access to asynchronous servlet
Imagine that you develop SMS service, which allows send SMS via HTTP interface. You want from your architecture to be protected from overloading, clustered and fully asynchronous.
Overloading protection requirement:
To prevent fraud and service overloading you want to introduce following limit for any outbound phone number: The bucket size is 20 SMS (which cannot be exceeded at any given time), with a "refill rate" of 10 SMS per minute that continually increases tokens in the bucket. In other words, if client sends 10 SMS per minute, it will never be throttled, and moreover client have overdraft equals to 20 SMS which can be used if average is little bit higher that 10 SMS/minute on short time period. Solution: lets use bucket4j for this.
Clustering requirement:
You want to avoid the single point of failure, if one server crashed that information about consumed tokens should not be lost, thus it would be better to use any distributed computation platform for storing the buckets.
Solution: lets use JBoss Infinispan for this and bucket4j-infinispan
extension.
Hazelcast and Apache Ignite will be also well choice, Infinispan just selected as example.
Asynchronous processing requirement: Also for maximum scalability you want from architecture to be fully non-blocking, non-blocking architecture means that both SMS sending and limit checking should be asynchronous. Solution: lets use asynchronous features provided by bucket4j and Servlet-API.
Mockup of service based on top of Servlet API and bucket4j-infinispan:
public class SmsServlet extends javax.servlet.http.HttpServlet {
private SmsSender smsSender;
private AsyncProxyManager<String> buckets;
private Supplier<BucketConfiguration> configuration;
@Override
public void init(ServletConfig config) throws ServletException {
super.init(config);
ServletContext ctx = config.getServletContext();
smsSender = (SmsSender) ctx.getAttribute("sms-sender");
FunctionalMapImpl<String, byte[]> bucketMap = (FunctionalMapImpl<String, byte[]>) ctx.getAttribute("bucket-map");
this.buckets = new InfinispanProxyManager(bucketMap).asAsync();
this.configuration = () -> {
long overdraft = 20;
Refill refill = Refill.greedy(10, Duration.ofMinutes(1));
Bandwidth limit = Bandwidth.classic(overdraft, refill);
return BucketConfiguratiion.builder()
.addLimit(limit)
.build();
};
}
@Override
protected void doPost(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest httpRequest = (HttpServletRequest) servletRequest;
String fromNumber = req.getParameter("from");
String toNumber = req.getParameter("to");
String text = req.getParameter("text");
AsyncBucketProxy bucket = buckets.builder().build(fromNumber, configuration);
CompletableFuture<ConsumptionProbe> limitCheckingFuture = bucket.asAsync().tryConsumeAndReturnRemaining(1);
final AsyncContext asyncContext = req.startAsync();
limitCheckingFuture.thenCompose(probe -> {
if (!probe.isConsumed()) {
Result throttledResult = Result.throttled(probe);
return CompletableFuture.completedFuture(throttledResult);
} else {
CompletableFuture<Result> sendingFuture = smsSender.sendAsync(fromNumber, toNumber, text);
return sendingFuture;
}
}).whenComplete((result, exception) -> {
HttpServletResponse asyncResponse = (HttpServletResponse) asyncContext.getResponse();
try {
asyncResponse.setContentType("text/plain");
if (exception != null || result.isFailed()) {
asyncResponse.setStatus(500);
asyncResponse.getWriter().println("Internal Error");
} else if (result.isThrottled()) {
asyncResponse.setStatus(429);
asyncResponse.setHeader("X-Rate-Limit-Retry-After-Seconds", "" + result.getRetryAfter());
asyncResponse.getWriter().append("Too many requests");
} else {
asyncResponse.setStatus(200);
asyncResponse.getWriter().append("Success");
}
} finally{
asyncContext.complete();
}
});
}
}
3.7. Production checklist especially in the context of distributed systems
Before using Bucket4j in clustered scenario you need to understand, agree and configure following points:
When working within a distributed system, it is innevitable that requests may cross the border of the current JVM, leading to a communication on the network. Network being unreliable, it is impossible to avoid failures. Thus you should embrace this reality and be ready to get unchecked exceptions when interacting with a distributed bucket. It is your responsibility to handle(or ignore) such exceptions:
-
You probably do not want to fail business transactions if the grid responsible for throttling goes down. If this is the case you can simply log the exception and continue your business transaction without throttling
-
If you wish to fail your business transaction when the grid responsible for throttling goes down, simply rethrow or don’t catch the exception
If the state of any bucket should survive the restart/crash of grid node that holds its state, you need to configure backups yourself, in way specific to the particular grid vendor. For example, see how to configure backups for Apache Ignite.
When dealing with multi-tenant scenarios like a bucket per user or a bucket per IP address, the amount of buckets in the cache will continuously increase. This is because a new bucket will be created each time a new key is detected. To prevent exhausting the available memory of your cluster you need to configure following aspects:
-
Expiration since last access - in order to allow the grid to remove the keys which haven’t been used in a long time. For example, see how to configure expiration policy for Apache Ignite.
-
Maximum cache size(in units or bytes) - Obviously it is preferable to lose bucket data than lose the whole cluster due to out of memory exception.
There are no special settings for HA supported by Bucket4j, because Bucket4j does nothing more that just invoking EntryProcessors on the cache. Instead Bucket4j relies on you to configure the cache with proper parameters that control redundancy and high availability.
Years of experience working with distributed system has tought the author that High Availability does not come for free. You need to test and verify that your system remains available. This cannot be provided by this or any other library. Your system will most certainly go down if you do not plan for that.
4. Advanced features
4.1. Listening for bucket events
4.1.1. What can be listened
-
When tokens consumed from bucket.
-
When consumption requests was rejected by bucket.
-
When thread was parked for wait of tokens refill in result of interaction with
BlockingBucket
. -
When thread was interrupted during the wait of tokens refill in result of interaction with
BlockingBucket
. -
When delayed task was submit to
ScheduledExecutorService
in result of interaction withAsyncScheduledBucket
.
4.1.2. Listener API - corner cases
Question: How many listeners is need to create in case of application uses many buckets?
Answer: it depends:
-
If you want to have aggregated statistics for all buckets then create single listener per application and reuse this listener for all buckets.
-
If you want to measure statistics independently per each bucket then use listener per bucket model.
Question: where is methods of listener are invoking in case of distributed usage?
Answer: listener always invoked on client side, it is means that each client JVM will have own totally independent stat for same bucket.
Question: Why does bucket invoke the listener on client side instead of server side in case of distributed scenario? What I need to do if I need in aggregated stat across the whole cluster?
Answer: Because of planned expansion to non-JVM back-ends such as Redis, MySQL, PostgreSQL. It is not possible to serialize and invoke listener on this non-java back-ends, so it was decided to invoke listener on client side, in order to avoid inconsistency between different back-ends in the future. You can do post-aggregation of monitoring statistics via features built-into your monitoring database or via mediator(like StatsD) between your application and monitoring database.
4.1.3. How to attach listener to bucket?
The bucket can be decorated by listener via toListenable
method.
BucketListener listener = new MyListener();
Bucket bucket = Bucket.builder()
.addLimit(Bandwidth.simple(100, Duration.ofMinutes(1)))
.build()
.toListenable(listener);
4.1.4. Example of integration with Dropwizard metrics-core
io.github.bucket4j.SimpleBucketListener
is simple implementation of io.github.bucket4j.BucketListener
interface that available out of the box. Bellow the example of exposing statistics via Dropwizard Metrics(for Micrometer it should be quite similar):
public static Bucket decorateBucketByStatListener(Bucket originalBucket, String bucketName, MetricRegistry registry) {
SimpleBucketListener stat = new SimpleBucketListener();
registry.register(name + ".consumed", (Gauge<Long>) stat::getConsumed);
registry.register(name + ".rejected", (Gauge<Long>) stat::getRejected);
registry.register(name + ".parkedNanos", (Gauge<Long>) stat::getParkedNanos);
registry.register(name + ".interrupted", (Gauge<Long>) stat::getInterrupted);
registry.register(name + ".delayedNanos", (Gauge<Long>) stat::getDelayedNanos);
return originalBucket.toListenable(stat);
}
4.2. Verbose API
- Verbose API
-
is the API which intent is in injecting low-level diagnostic information into results of any interaction with bucket. Verbose API providing the same functionality as Regular API, with one exception - result of any method always decorated by
VerboseResult
wrapper. - VerboseResult
-
is the wrapper for interaction result that provides the snapshot of bucket and its configuration that was actual at the moment of interaction with bucket.
4.2.1. Verbose API entry-points
The way to get access for Verbose API
is the same for all type of buckets, just call asVerbose()
method:
// for io.github.bucket4j.Bucket
Bucket bucket = ...;
bucket.asVerbose();
// for io.github.bucket4j.Bucket
Bucket bucket = ...;
VerboseBucket verboseBucket = bucket.asVerbose();
// for io.github.bucket4j.distributed.BucketProxy
BucketProxy bucket = ...;
VerboseBucket verboseBucket = bucket.asVerbose();
// for io.github.bucket4j.distributed.AsyncBucketProxy
AsyncBucketProxy bucket = ...;
AsyncVerboseBucket verboseBucket = bucket.asVerbose();
Note
|
BlockingBucket and ScheduledBucket do not provide the verbose analogs. VerboseResult has no sense for this kind of buckets because interactions with them can be followed by thread sleep or delayed execution, so VerboseResult can be absolutely stale and irrelevant to the moment of time when control over execution is being returned to your code. |
4.2.2. Principles of result decoration
-
void return type always decorated by
VerboseResult<Void>
-
A primitive result type like long, boolean always decorated by correspondent boxed type for example
VerboseResult<Boolean>
-
None primitive reult type always decorated as is, for example
VerboseResult<EstimationProbe>
4.2.3. Example of Verbose API usage
VerboseResult<ConsumptionProbe> verboseResult = bucket.asVerbose().tryConsumeAndReturnRemaining(numberOfTokens);
BucketConfiguration bucketConfiguration = verboseResult.getConfiguration();
long capacity = Arrays.stream(bucketConfiguration.getBandwidths())
.mapToLong(Bandwidth::getCapacity)
.max().getAsLong();
response.addHeader("RateLimit-Limit", "" + capacity));
VerboseResult.Diagnostics diagnostics = result.getDiagnostics()
response.addHeader("RateLimit-Remaining", "" + diagnostics.getAvailableTokens());
response.addHeader("RateLimit-Reset", "" + TimeUnit.NANOSECONDS.toSeconds(diagnostics.calculateFullRefillingTime()));
ConsumptionProbe probe = verboseResult.getValue();
if (probe.isConsumed()) {
// the limit is not exceeded
filterChain.doFilter(servletRequest, servletResponse);
} else {
// limit is exceeded
HttpServletResponse httpResponse = (HttpServletResponse) servletResponse;
httpResponse.setStatus(429);
httpResponse.setContentType("text/plain");
httpResponse.getWriter().append("Too many requests");
}
4.3. On-the-fly configuration replacement
As previously mentioned in the definition for BucketConfiguration it is immutable object.
It is not possible to add, remove or change the limits for already created configuration, however, you can replace configuration of bucket via creating new configuration instance and calling bucket.replaceConfiguration(newConfiguration, tokensInheritanceStrategy)
.
4.3.1. Why configuration replacement is not trivial?
-
The first problem of configuration replacement is making decision how to propagate available tokens from bucket with previous configuration to bucket with new configuration. If you don’t care about previous bucket state then use
TokensInheritanceStrategy.RESET
. But it becomes to a tricky problem when we expect that previous consumption(that has not been compensated by refill yet) should take effect to the bucket with new configuration. In this case you need to make a choice between: -
There is another problem when you are choosing PROPORTIONALLY, AS_IS or ADDITIVE or AS_IS and bucket has more then one bandwidth. For example how does replaceConfiguration implementation should bind bandwidths to each other in the following example?
Bucket bucket = Bucket.builder() .addLimit(Bandwidth.simple(10, Duration.ofSeconds(1))) .addLimit(Bandwidth.simple(10000, Duration.ofHours(1))) .build(); ... BucketConfiguration newConfiguration = BucketConfiguration.configurationBuilder() .addLimit(Bandwidth.simple(5000, Duration.ofHours(1))) .addLimit(Bandwidth.simple(100, Duration.ofSeconds(10))) .build(); bucket.replaceConfiguration(newConfiguration, TokensInheritanceStrategy.AS_IS);
It is obviously that simple strategy - copying tokens by bandwidth index will not work well in this case, because of it highly depends on order in which bandwidths were mentioneed in new and previous configuration.
4.3.2. Taking control over replacement process via bandwidth identifiers
Instead of inventing the backward maggic Bucket4j provides to you ability to deap controll of this process by specifying identifiers for bandwidth, so in case of multiple bandwidth configuratoin replacement code can copy available tokens by bandwidth ID. So it is better to rewrite code above as following:
Bucket bucket = Bucket.builder()
.addLimit(Bandwidth.simple(10, Duration.ofSeconds(1)).withId("technical-limit"))
.addLimit(Bandwidth.simple(10000, Duration.ofHours(1)).withId("business-limit"))
.build();
...
BucketConfiguration newConfiguration = BucketConfiguration.builder()
.addLimit(Bandwidth.simple(5000, Duration.ofHours(1)).withId("business-limit"))
.addLimit(Bandwidth.simple(100, Duration.ofSeconds(10)).withId("technical-limit"))
.build();
bucket.replaceConfiguration(newConfiguration, TokensInheritanceStrategy.PROPORTIONALLY);
-
By default bandwidth has <b>null</b> identifier.
-
null value of identifier equals to another null value if and only if there is only one bandwidth with null identifier.
-
If identifier for bandwidth is specified then it must has unique in the bucket. Bucket does not allow to create several bandwidth with same ID.
4.3.3. TokensInheritanceStrategy explanation
TokensInheritanceStrategy specifies the rules for inheritance of available tokens during configuration replacement process.
- PROPORTIONALLY
-
Makes to copy available tokens proportional to bandwidth capacity by following formula: newAvailableTokens = availableTokensBeforeReplacement * (newBandwidthCapacity / capacityBeforeReplacement)
PROPORTIONALLY strategy examples:-
Example 1: imagine bandwidth that was created by
Bandwidth.classic(100, Refill.gready(10, Duration.ofMinutes(1)))
.At the moment of config replacement it was 40 available tokens.
After replacing this bandwidth by following
Bandwidth.classic(200, Refill.gready(10, Duration.ofMinutes(1)))
40 available tokens will be multiplied by 2(200/100), and after replacement we will have 80 available tokens. -
Example 2: imagine bandwidth that was created by
Bandwidth.classic(100, Refill.gready(10, Duration.ofMinutes(1)))
. At the moment of config replacement it was 40 available tokens. After replacing this bandwidth by followingBandwidth.classic(20, Refill.gready(10, Duration.ofMinutes(1)))
40 available tokens will be multiplied by 0.2(20/100), and after replacement we will have 8 available tokens.
-
- AS_IS
-
Instructs to copy available tokens as is, but with one exclusion: if available tokens is greater than new capacity, available tokens will be decreased to new capacity.
AS_IS strategy examples:-
Example 1: imagine bandwidth that was created by
Bandwidth.classic(100, Refill.gready(10, Duration.ofMinutes(1)))
.At the moment of config replacement it was 40 available tokens.
After replacing this bandwidth by following
Bandwidth.classic(200, Refill.gready(10, Duration.ofMinutes(1)))}
40 available tokens will be just copied, and after replacement we will have 40 available tokens. -
Example 2: imagine bandwidth that was created by
Bandwidth.classic(100, Refill.gready(10, Duration.ofMinutes(1)))
.At the moment of config replacement it was 40 available tokens.
After replacing this bandwidth by following
Bandwidth.classic(20, Refill.gready(10, Duration.ofMinutes(1)))
40 available tokens can not be copied as is, because it is greater than new capacity, so available tokens will be reduced to 20.
-
- RESET
-
Use this mode when you want just to forget about previous bucket state. RESET just instructs to erases all previous state. Using this strategy equals to removing bucket and creating again with new configuration.
- ADDITIVE
-
Instructs to copy available tokens as is, but with one exclusion: if new bandwidth capacity is greater than old capacity, available tokens will be increased by the difference between the old and the new configuration.
The formula is following:
newAvailableTokens = Math.min(availableTokensBeforeReplacement, newBandwidthCapacity) + Math.max(0, newBandwidthCapacity - capacityBeforeReplacement)
ADDITIVE strategy examples:-
Example 1: imagine bandwidth that was created by
Bandwidth.classic(100, Refill.gready(10, Duration.ofMinutes(1)))
.At the moment of configuration replacement, it was 40 available tokens.
After replacing this bandwidth by following
Bandwidth.classic(200, Refill.gready(10, Duration.ofMinutes(1)))
40 available tokens will be copied and added to the difference between old and new configuration, and after replacement, we will have 140 available tokens. -
Example 2: imagine bandwidth that was created by
Bandwidth.classic(100, Refill.gready(10, Duration.ofMinutes(1)))
.At the moment of config replacement it was 40 available tokens.
After replacing this bandwidth by following
Bandwidth.classic(20, Refill.gready(10, Duration.ofMinutes(1))))
, and after replacement we will have 20 available tokens. -
Example 3: imagine bandwidth that was created by
Bandwidth.classic(100, Refill.gready(10, Duration.ofMinutes(1)))
.At the moment of config replacement it was 10 available tokens.
After replacing this bandwidth by following
Bandwidth.classic(20, Refill.gready(10, Duration.ofMinutes(1))))
, and after replacement we will have 10 available tokens.
-