Merge changes I475d55b3,I2bc7b3f7,Ibc04347f,I17368493,I642953e4, ... * changes: Use Java record for Forwarder.Result Add metric displaying the number of retries required to forward event Add metrics to measure time of processing forwarded event Add metric reporting the success/failure of event processing Add metrics measuring time between event creation and response Use millisecond accuracy for IndexEvent.eventCreatedOn Add metric about failure/success count of forwarding request Use enum for Command.TYPE
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/EventType.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/EventType.java new file mode 100644 index 0000000..8cfea0c --- /dev/null +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/EventType.java
@@ -0,0 +1,29 @@ +// Copyright (C) 2025 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.ericsson.gerrit.plugins.highavailability.forwarder; + +public enum EventType { + CACHE_EVICTION, + EVENT_SENT, + INDEX_ACCOUNT_UPDATE, + INDEX_CHANGE_DELETION, + INDEX_CHANGE_DELETION_ALL_OF_PROJECT, + INDEX_CHANGE_UPDATE, + INDEX_CHANGE_UPDATE_BATCH, + INDEX_GROUP_UPDATE, + INDEX_PROJECT_UPDATE, + PROJECT_LIST_ADDITION, + PROJECT_LIST_DELETION +}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexAccountHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexAccountHandler.java index b2f8141..bee5f2e 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexAccountHandler.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexAccountHandler.java
@@ -38,8 +38,13 @@ @Override protected CompletableFuture<Boolean> doIndex(Account.Id id, Optional<IndexEvent> indexEvent) { - indexer.index(id); - log.atFine().log("Account %s successfully indexed", id); + try { + indexer.index(id); + log.atFine().log("Account %s successfully indexed", id); + } catch (RuntimeException e) { + log.atFine().log("Account %s failed to be indexed", id); + throw e; + } return CompletableFuture.completedFuture(true); }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandler.java index fd654ba..80aa836 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandler.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexChangeHandler.java
@@ -68,7 +68,8 @@ () -> { try (ManualRequestContext ctx = oneOffCtx.open()) { Context.setForwardedEvent(true); - return indexOnce(id, indexEvent); + boolean result = indexOnce(id, indexEvent); + return result; } }); } @@ -123,11 +124,22 @@ throws IOException { if (ALL_CHANGES_FOR_PROJECT.equals(extractChangeId(id))) { Project.NameKey projectName = parseProject(id); - indexer.deleteAllForProject(projectName); - log.atFine().log("All %s changes successfully deleted from index", projectName.get()); + try { + indexer.deleteAllForProject(projectName); + log.atFine().log("All %s changes successfully deleted from index", projectName.get()); + } catch (RuntimeException e) { + log.atFine().log( + "An error occured during deletion of all %s changes from index", projectName.get()); + throw e; + } } else { - indexer.delete(parseChangeId(id)); - log.atFine().log("Change %s successfully deleted from index", id); + try { + indexer.delete(parseChangeId(id)); + log.atFine().log("Change %s successfully deleted from index", id); + } catch (RuntimeException e) { + log.atFine().log("Change %s could not be deleted from index", id); + throw e; + } } return CompletableFuture.completedFuture(true); }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexGroupHandler.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexGroupHandler.java index 99ac369..01f8680 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexGroupHandler.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwardedIndexGroupHandler.java
@@ -39,8 +39,13 @@ @Override protected CompletableFuture<Boolean> doIndex( AccountGroup.UUID uuid, Optional<IndexEvent> indexEvent) { - indexer.index(uuid); - log.atFine().log("Group %s successfully indexed", uuid); + try { + indexer.index(uuid); + log.atFine().log("Group %s successfully indexed", uuid); + } catch (RuntimeException e) { + log.atFine().log("Group %s could not be indexed", uuid); + throw e; + } return CompletableFuture.completedFuture(true); }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/Forwarder.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/Forwarder.java index 3854ee3..3bf08b4 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/Forwarder.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/Forwarder.java
@@ -21,6 +21,12 @@ /** Forward indexing, stream events and cache evictions to the other primary */ public interface Forwarder { + public record Result(EventType type, boolean result, boolean isRecoverable) { + public Result(EventType task, boolean result) { + this(task, result, true); + } + } + /** * Forward an account indexing event to the other primary. * @@ -29,7 +35,7 @@ * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of * false. */ - CompletableFuture<Boolean> indexAccount(int accountId, IndexEvent indexEvent); + CompletableFuture<Result> indexAccount(int accountId, IndexEvent indexEvent); /** * Forward a change indexing event to the other primary. @@ -40,7 +46,7 @@ * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of * false. */ - CompletableFuture<Boolean> indexChange(String projectName, int changeId, IndexEvent indexEvent); + CompletableFuture<Result> indexChange(String projectName, int changeId, IndexEvent indexEvent); /** * Forward a change indexing event to the other primary using batch index endpoint. @@ -51,7 +57,7 @@ * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of * false. */ - CompletableFuture<Boolean> batchIndexChange( + CompletableFuture<Result> batchIndexChange( String projectName, int changeId, IndexEvent indexEvent); /** @@ -62,7 +68,7 @@ * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of * false. */ - CompletableFuture<Boolean> deleteChangeFromIndex(int changeId, IndexEvent indexEvent); + CompletableFuture<Result> deleteChangeFromIndex(int changeId, IndexEvent indexEvent); /** * Forward a group indexing event to the other primary. @@ -72,7 +78,7 @@ * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of * false. */ - CompletableFuture<Boolean> indexGroup(String uuid, IndexEvent indexEvent); + CompletableFuture<Result> indexGroup(String uuid, IndexEvent indexEvent); /** * Forward a project indexing event to the other primary. @@ -82,7 +88,7 @@ * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of * false. */ - CompletableFuture<Boolean> indexProject(String projectName, IndexEvent indexEvent); + CompletableFuture<Result> indexProject(String projectName, IndexEvent indexEvent); /** * Forward a stream event to the other primary. @@ -91,7 +97,7 @@ * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of * false. */ - CompletableFuture<Boolean> send(Event event); + CompletableFuture<Result> send(Event event); /** * Forward a cache eviction event to the other primary. @@ -101,7 +107,7 @@ * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of * false. */ - CompletableFuture<Boolean> evict(String cacheName, Object key); + CompletableFuture<Result> evict(String cacheName, Object key); /** * Forward an addition to the project list cache to the other primary. @@ -110,7 +116,7 @@ * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of * false. */ - CompletableFuture<Boolean> addToProjectList(String projectName); + CompletableFuture<Result> addToProjectList(String projectName); /** * Forward a removal from the project list cache to the other primary. @@ -119,7 +125,7 @@ * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of * false. */ - CompletableFuture<Boolean> removeFromProjectList(String projectName); + CompletableFuture<Result> removeFromProjectList(String projectName); /** * Forward the removal of all project changes from index to the other primary. @@ -128,5 +134,5 @@ * @return {@link CompletableFuture} of true if successful, otherwise {@link CompletableFuture} of * false. */ - CompletableFuture<Boolean> deleteAllChangesForProject(Project.NameKey projectName); + CompletableFuture<Result> deleteAllChangesForProject(Project.NameKey projectName); }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwarderMetrics.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwarderMetrics.java new file mode 100644 index 0000000..eecef15 --- /dev/null +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwarderMetrics.java
@@ -0,0 +1,83 @@ +// Copyright (C) 2025 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.ericsson.gerrit.plugins.highavailability.forwarder; + +import com.google.gerrit.metrics.Counter0; +import com.google.gerrit.metrics.Description; +import com.google.gerrit.metrics.Histogram0; +import com.google.gerrit.metrics.MetricMaker; +import com.google.gerrit.metrics.Timer0; +import com.google.inject.assistedinject.Assisted; +import com.google.inject.assistedinject.AssistedInject; +import java.util.Locale; +import java.util.concurrent.TimeUnit; + +public class ForwarderMetrics { + private final Timer0 latencyMetric; + private final Counter0 failureCounterMetric; + private final Counter0 successCounterMetric; + private final Histogram0 retryMetric; + + public interface Factory { + ForwarderMetrics create(EventType eventType); + } + + @AssistedInject + public ForwarderMetrics(MetricMaker metricMaker, @Assisted EventType eventType) { + String event = eventType.toString().toLowerCase(Locale.US); + + this.latencyMetric = + metricMaker.newTimer( + String.format("forwarding_%s_event/latency", event), + new Description( + String.format( + "Time from %s event scheduling to receiving on the other node", event)) + .setCumulative() + .setUnit(Description.Units.MILLISECONDS)); + this.failureCounterMetric = + metricMaker.newCounter( + String.format("forwarding_%s_event/failure", event), + new Description(String.format("%s events forwarding failures count", event)) + .setCumulative() + .setRate()); + this.successCounterMetric = + metricMaker.newCounter( + String.format("forwarding_%s_event/success", event), + new Description(String.format("%s events forwarding success count", event)) + .setCumulative() + .setRate()); + this.retryMetric = + metricMaker.newHistogram( + String.format("forwarding_%s_event/retries", eventType), + new Description(String.format("%s events forwarding retries", eventType)) + .setCumulative()); + } + + public void recordResult(boolean isSuccessful) { + if (isSuccessful) { + successCounterMetric.increment(); + } else { + failureCounterMetric.increment(); + } + } + + public void recordLatency(long latencyMs) { + latencyMetric.record(latencyMs, TimeUnit.MILLISECONDS); + } + + public void recordRetries(int retries) { + retryMetric.record(retries); + } +}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwarderMetricsRegistry.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwarderMetricsRegistry.java new file mode 100644 index 0000000..97f73f6 --- /dev/null +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwarderMetricsRegistry.java
@@ -0,0 +1,50 @@ +// Copyright (C) 2025 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.ericsson.gerrit.plugins.highavailability.forwarder; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +@Singleton +public class ForwarderMetricsRegistry { + + private final ForwarderMetrics.Factory metricsFactory; + + private Map<EventType, ForwarderMetrics> metrics = new HashMap<>(); + + @Inject + public ForwarderMetricsRegistry(ForwarderMetrics.Factory metricsFactory) { + this.metricsFactory = metricsFactory; + this.putAll(Arrays.asList(EventType.values())); + } + + public ForwarderMetrics get(EventType eventType) { + return metrics.get(eventType); + } + + public void put(EventType task) { + metrics.put(task, metricsFactory.create(task)); + } + + public void putAll(Collection<EventType> eventTypes) { + for (EventType eventType : eventTypes) { + put(eventType); + } + } +}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwarderModule.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwarderModule.java index 7cb107c..f5c5a39 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwarderModule.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwarderModule.java
@@ -15,18 +15,22 @@ package com.ericsson.gerrit.plugins.highavailability.forwarder; import com.ericsson.gerrit.plugins.highavailability.ConfigurableAllowedEventListeners; +import com.google.gerrit.extensions.config.FactoryModule; import com.google.gerrit.extensions.registration.DynamicItem; import com.google.gerrit.server.events.EventDispatcher; -import com.google.inject.AbstractModule; import com.google.inject.Scopes; -public class ForwarderModule extends AbstractModule { - +public class ForwarderModule extends FactoryModule { @Override protected void configure() { bind(AllowedForwardedEventListener.class) .to(ConfigurableAllowedEventListeners.class) .in(Scopes.SINGLETON); + bind(ForwarderMetricsRegistry.class); DynamicItem.bind(binder(), EventDispatcher.class).to(ForwardedAwareEventBroker.class); + factory(ForwarderMetrics.Factory.class); + bind(ForwarderMetricsRegistry.class).in(Scopes.SINGLETON); + factory(ProcessorMetrics.Factory.class); + bind(ProcessorMetricsRegistry.class).in(Scopes.SINGLETON); } }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwarderTask.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwarderTask.java new file mode 100644 index 0000000..f8895e1 --- /dev/null +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ForwarderTask.java
@@ -0,0 +1,28 @@ +// Copyright (C) 2025 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.ericsson.gerrit.plugins.highavailability.forwarder; + +public enum ForwarderTask { + ADD_TO_PROJECTS_LIST, + BATCH_INDEX_CHANGE, + DELETE_CHANGE_FROM_INDEX, + EVICT_CACHE, + INDEX_ACCOUNT, + INDEX_CHANGE, + INDEX_GROUP, + INDEX_PROJECT, + REMOVE_FROM_PROJECTS_LIST, + SEND_EVENT +}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/IndexEvent.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/IndexEvent.java index 71cf044..129af6b 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/IndexEvent.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/IndexEvent.java
@@ -14,12 +14,13 @@ package com.ericsson.gerrit.plugins.highavailability.forwarder; +import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; public class IndexEvent { - public long eventCreatedOn = System.currentTimeMillis() / 1000; + public Instant eventCreatedOn = Instant.now(); public String targetSha; public String metaSha; @@ -31,8 +32,7 @@ + ((metaSha != null) ? "/meta:" + metaSha : ""); } - public static String format(long eventTs) { - return LocalDateTime.ofEpochSecond(eventTs, 0, ZoneOffset.UTC) - .format(DateTimeFormatter.ISO_DATE_TIME); + public static String format(Instant eventTs) { + return LocalDateTime.ofInstant(eventTs, ZoneOffset.UTC).format(DateTimeFormatter.ISO_DATE_TIME); } }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ProcessorMetrics.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ProcessorMetrics.java new file mode 100644 index 0000000..de1e271 --- /dev/null +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ProcessorMetrics.java
@@ -0,0 +1,94 @@ +// Copyright (C) 2025 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.ericsson.gerrit.plugins.highavailability.forwarder; + +import com.google.gerrit.common.Nullable; +import com.google.gerrit.metrics.Counter0; +import com.google.gerrit.metrics.Description; +import com.google.gerrit.metrics.MetricMaker; +import com.google.gerrit.metrics.Timer0; +import com.google.inject.assistedinject.Assisted; +import com.google.inject.assistedinject.AssistedInject; +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.TimeUnit; + +public class ProcessorMetrics { + private final Timer0 processingTimeMetric; + private final Timer0 totalTimeMetric; + private final Counter0 failureCounterMetric; + private final Counter0 successCounterMetric; + + public interface Factory { + ProcessorMetrics create(EventType eventType); + } + + @AssistedInject + public ProcessorMetrics(MetricMaker metricMaker, @Assisted EventType eventType) { + this.processingTimeMetric = + metricMaker.newTimer( + String.format("forwarded_%s_event_handler/time_processing", eventType), + new Description( + String.format( + "Time from receiving an %s event to finish processing it.", eventType)) + .setCumulative() + .setUnit(Description.Units.MILLISECONDS)); + this.totalTimeMetric = + metricMaker.newTimer( + String.format("forwarded_%s_event_handler/time_total", eventType), + new Description( + String.format( + "Time from %s event scheduling to finish processing it.", eventType)) + .setCumulative() + .setUnit(Description.Units.MILLISECONDS)); + this.failureCounterMetric = + metricMaker.newCounter( + String.format("forwarded_%s_event_handler/failure", eventType), + new Description(String.format("%s events forwarding failures count", eventType)) + .setCumulative() + .setRate()); + this.successCounterMetric = + metricMaker.newCounter( + String.format("forwarded_%s_event_handler/success", eventType), + new Description(String.format("%s events forwarding success count", eventType)) + .setCumulative() + .setRate()); + } + + public void recordResult(boolean isSuccessful) { + if (isSuccessful) { + successCounterMetric.increment(); + } else { + failureCounterMetric.increment(); + } + } + + public void recordProcessingTime(Long processingTime) { + processingTimeMetric.record(processingTime, TimeUnit.MILLISECONDS); + } + + public void recordTotalTime(Long totalTime) { + totalTimeMetric.record(totalTime, TimeUnit.MILLISECONDS); + } + + public void record(@Nullable Instant eventCreatedOn, Instant startTime, boolean success) { + Instant now = Instant.now(); + recordResult(success); + recordProcessingTime(Duration.between(startTime, now).toMillis()); + if (eventCreatedOn != null) { + recordTotalTime(Duration.between(eventCreatedOn, now).toMillis()); + } + } +}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ProcessorMetricsRegistry.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ProcessorMetricsRegistry.java new file mode 100644 index 0000000..80504ab --- /dev/null +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/ProcessorMetricsRegistry.java
@@ -0,0 +1,50 @@ +// Copyright (C) 2025 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.ericsson.gerrit.plugins.highavailability.forwarder; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +@Singleton +public class ProcessorMetricsRegistry { + + private final ProcessorMetrics.Factory metricsFactory; + + private Map<EventType, ProcessorMetrics> metrics = new HashMap<>(); + + @Inject + public ProcessorMetricsRegistry(ProcessorMetrics.Factory metricsFactory) { + this.metricsFactory = metricsFactory; + this.putAll(Arrays.asList(EventType.values())); + } + + public ProcessorMetrics get(EventType eventType) { + return metrics.get(eventType); + } + + public void put(EventType task) { + metrics.put(task, metricsFactory.create(task)); + } + + public void putAll(Collection<EventType> eventTypes) { + for (EventType eventType : eventTypes) { + put(eventType); + } + } +}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/AddToProjectList.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/AddToProjectList.java index f7af2df..6f47da3 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/AddToProjectList.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/AddToProjectList.java
@@ -14,13 +14,16 @@ package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups; +import com.ericsson.gerrit.plugins.highavailability.forwarder.EventType; +import java.time.Instant; + public class AddToProjectList extends Command { - static final String TYPE = "add-to-project-list"; + static final EventType TYPE = EventType.PROJECT_LIST_ADDITION; private final String projectName; - public AddToProjectList(String projectName) { - super(TYPE); + public AddToProjectList(String projectName, Instant eventCreatedOn) { + super(TYPE, eventCreatedOn); this.projectName = projectName; }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/Command.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/Command.java index a258d91..3abad1e 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/Command.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/Command.java
@@ -14,10 +14,15 @@ package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups; -public class Command { - public final String type; +import com.ericsson.gerrit.plugins.highavailability.forwarder.EventType; +import java.time.Instant; - protected Command(String type) { +public class Command { + public final EventType type; + public final Instant eventCreatedOn; + + protected Command(EventType type, Instant eventCreatedOn) { this.type = type; + this.eventCreatedOn = eventCreatedOn; } }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/CommandDeserializer.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/CommandDeserializer.java index b75f409..cdd217c 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/CommandDeserializer.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/CommandDeserializer.java
@@ -14,6 +14,7 @@ package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups; +import com.ericsson.gerrit.plugins.highavailability.forwarder.EventType; import com.google.gson.JsonDeserializationContext; import com.google.gson.JsonDeserializer; import com.google.gson.JsonElement; @@ -29,6 +30,7 @@ private static final List<Class<? extends Command>> CMD_CLASSES = List.of( IndexChange.Update.class, + IndexChange.BatchUpdate.class, IndexChange.Delete.class, IndexAccount.class, IndexGroup.class, @@ -37,13 +39,13 @@ PostEvent.class, AddToProjectList.class, RemoveFromProjectList.class); - private static final Map<String, Class<?>> COMMAND_TYPE_TO_CLASS_MAPPING = new HashMap<>(); + private static final Map<EventType, Class<?>> COMMAND_TYPE_TO_CLASS_MAPPING = new HashMap<>(); static { for (Class<?> clazz : CMD_CLASSES) { try { Field type = clazz.getDeclaredField("TYPE"); - COMMAND_TYPE_TO_CLASS_MAPPING.put((String) type.get(null), clazz); + COMMAND_TYPE_TO_CLASS_MAPPING.put((EventType) type.get(null), clazz); } catch (Exception e) { throw new RuntimeException(e); } @@ -63,7 +65,7 @@ throw new JsonParseException("Type is not a string: " + typeJson); } String type = typeJson.getAsJsonPrimitive().getAsString(); - Class<?> commandClass = COMMAND_TYPE_TO_CLASS_MAPPING.get(type); + Class<?> commandClass = COMMAND_TYPE_TO_CLASS_MAPPING.get(EventType.valueOf(type)); if (commandClass == null) { throw new JsonParseException("Unknown command type: " + type); }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/DeleteAllProjectChangesFromIndex.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/DeleteAllProjectChangesFromIndex.java index 79933fb..81ce445 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/DeleteAllProjectChangesFromIndex.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/DeleteAllProjectChangesFromIndex.java
@@ -14,15 +14,17 @@ package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups; +import com.ericsson.gerrit.plugins.highavailability.forwarder.EventType; import com.google.gerrit.entities.Project; +import java.time.Instant; public class DeleteAllProjectChangesFromIndex extends Command { - static final String TYPE = "delete-all-project-changes-from-index"; + static final EventType TYPE = EventType.INDEX_CHANGE_DELETION_ALL_OF_PROJECT; private final Project.NameKey projectName; - protected DeleteAllProjectChangesFromIndex(Project.NameKey projectName) { - super(TYPE); + protected DeleteAllProjectChangesFromIndex(Project.NameKey projectName, Instant createdOn) { + super(TYPE, createdOn); this.projectName = projectName; }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/EvictCache.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/EvictCache.java index c4ae6d0..9ca05f1 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/EvictCache.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/EvictCache.java
@@ -14,14 +14,17 @@ package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups; +import com.ericsson.gerrit.plugins.highavailability.forwarder.EventType; +import java.time.Instant; + public class EvictCache extends Command { - static final String TYPE = "evict-cache"; + static final EventType TYPE = EventType.CACHE_EVICTION; private final String cacheName; private final String keyJson; - protected EvictCache(String cacheName, String keyJson) { - super(TYPE); + protected EvictCache(String cacheName, String keyJson, Instant eventCreatedOn) { + super(TYPE, eventCreatedOn); this.cacheName = cacheName; this.keyJson = keyJson; }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/FailsafeExecutorProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/FailsafeExecutorProvider.java index 8f81832..be6bbf4 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/FailsafeExecutorProvider.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/FailsafeExecutorProvider.java
@@ -15,6 +15,7 @@ package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups; import com.ericsson.gerrit.plugins.highavailability.Configuration; +import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder.Result; import com.google.common.flogger.FluentLogger; import com.google.gerrit.server.git.WorkQueue; import com.google.inject.Inject; @@ -23,7 +24,7 @@ import dev.failsafe.FailsafeExecutor; import dev.failsafe.RetryPolicy; -public class FailsafeExecutorProvider implements Provider<FailsafeExecutor<Boolean>> { +public class FailsafeExecutorProvider implements Provider<FailsafeExecutor<Result>> { private static final FluentLogger log = FluentLogger.forEnclosingClass(); private final Configuration cfg; private final WorkQueue workQueue; @@ -35,9 +36,9 @@ } @Override - public FailsafeExecutor<Boolean> get() { - RetryPolicy<Boolean> retryPolicy = - RetryPolicy.<Boolean>builder() + public FailsafeExecutor<Result> get() { + RetryPolicy<Result> retryPolicy = + RetryPolicy.<Result>builder() .withMaxAttempts(cfg.jgroups().maxTries()) .withDelay(cfg.jgroups().retryInterval()) .onRetry(e -> log.atFine().log("Retrying event %s", e)) @@ -45,7 +46,7 @@ e -> log.atWarning().log( "%d jgroups retries exceeded for event %s", cfg.jgroups().maxTries(), e)) - .handleResult(false) + .handleResultIf(r -> !r.result()) .build(); return Failsafe.with(retryPolicy) .with(workQueue.createQueue(cfg.jgroups().threadPoolSize(), "JGroupsForwarder"));
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/IndexAccount.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/IndexAccount.java index 7d9bbc4..55cadda 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/IndexAccount.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/IndexAccount.java
@@ -14,13 +14,16 @@ package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups; +import com.ericsson.gerrit.plugins.highavailability.forwarder.EventType; +import java.time.Instant; + public class IndexAccount extends Command { - static final String TYPE = "index-account"; + static final EventType TYPE = EventType.INDEX_ACCOUNT_UPDATE; private final int id; - public IndexAccount(int id) { - super(TYPE); + public IndexAccount(int id, Instant eventCreatedOn) { + super(TYPE, eventCreatedOn); this.id = id; }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/IndexChange.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/IndexChange.java index 501a722..075bfd7 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/IndexChange.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/IndexChange.java
@@ -14,15 +14,18 @@ package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups; +import com.ericsson.gerrit.plugins.highavailability.forwarder.EventType; import com.google.common.base.Strings; +import java.time.Instant; public abstract class IndexChange extends Command { private final String projectName; private final int id; private final boolean batchMode; - protected IndexChange(String type, String projectName, int id, boolean batchMode) { - super(type); + protected IndexChange( + EventType type, String projectName, int id, boolean batchMode, Instant eventCreatedOn) { + super(type, eventCreatedOn); this.projectName = projectName; this.id = id; this.batchMode = batchMode; @@ -37,26 +40,30 @@ } public static class Update extends IndexChange { - static final String TYPE = "update-change"; + static final EventType TYPE = EventType.INDEX_CHANGE_UPDATE; - public Update(String projectName, int id) { - this(projectName, id, false); + public Update(String projectName, int id, Instant eventCreatedOn) { + super(TYPE, projectName, id, false, eventCreatedOn); } + } - public Update(String projectName, int id, boolean batchMode) { - super(TYPE, projectName, id, batchMode); + public static class BatchUpdate extends IndexChange { + static final EventType TYPE = EventType.INDEX_CHANGE_UPDATE_BATCH; + + public BatchUpdate(String projectName, int id, Instant eventCreatedOn) { + super(TYPE, projectName, id, true, eventCreatedOn); } } public static class Delete extends IndexChange { - static final String TYPE = "delete-change"; + static final EventType TYPE = EventType.INDEX_CHANGE_DELETION; - public Delete(int id) { - this("", id); + public Delete(int id, Instant eventCreatedOn) { + this("", id, eventCreatedOn); } - public Delete(String projectName, int id) { - super(TYPE, projectName, id, false); + public Delete(String projectName, int id, Instant eventCreatedOn) { + super(TYPE, projectName, id, false, eventCreatedOn); } } }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/IndexGroup.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/IndexGroup.java index b5c9f78..9f49060 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/IndexGroup.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/IndexGroup.java
@@ -14,13 +14,16 @@ package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups; +import com.ericsson.gerrit.plugins.highavailability.forwarder.EventType; +import java.time.Instant; + public class IndexGroup extends Command { - static final String TYPE = "index-group"; + static final EventType TYPE = EventType.INDEX_GROUP_UPDATE; private final String uuid; - protected IndexGroup(String uuid) { - super(TYPE); + protected IndexGroup(String uuid, Instant eventCreatedOn) { + super(TYPE, eventCreatedOn); this.uuid = uuid; }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/IndexProject.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/IndexProject.java index 4d8214d..40e400d 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/IndexProject.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/IndexProject.java
@@ -14,13 +14,16 @@ package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups; +import com.ericsson.gerrit.plugins.highavailability.forwarder.EventType; +import java.time.Instant; + public class IndexProject extends Command { - static final String TYPE = "index-project"; + static final EventType TYPE = EventType.INDEX_PROJECT_UPDATE; private String projectName; - protected IndexProject(String projectName) { - super(TYPE); + protected IndexProject(String projectName, Instant eventCreatedOn) { + super(TYPE, eventCreatedOn); this.projectName = projectName; }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/InstantTypeAdapter.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/InstantTypeAdapter.java new file mode 100644 index 0000000..df8780c --- /dev/null +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/InstantTypeAdapter.java
@@ -0,0 +1,40 @@ +// Copyright (C) 2025 The Android Open Source Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups; + +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonParseException; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; +import java.lang.reflect.Type; +import java.time.Instant; + +public class InstantTypeAdapter implements JsonDeserializer<Instant>, JsonSerializer<Instant> { + @Override + public Instant deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) + throws JsonParseException { + if (json == null || !json.isJsonPrimitive() || !json.getAsJsonPrimitive().isNumber()) { + throw new JsonParseException("Invalid Instant value: " + json); + } + return Instant.ofEpochMilli(json.getAsLong()); + } + + @Override + public JsonElement serialize(Instant src, Type typeOfSrc, JsonSerializationContext context) { + return context.serialize(src.toEpochMilli(), Long.class); + } +}
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/JGroupsForwarder.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/JGroupsForwarder.java index 9f1f179..0bd8c5d 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/JGroupsForwarder.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/JGroupsForwarder.java
@@ -17,6 +17,7 @@ import com.ericsson.gerrit.plugins.highavailability.Configuration; import com.ericsson.gerrit.plugins.highavailability.Configuration.JGroups; import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwarderMetricsRegistry; import com.ericsson.gerrit.plugins.highavailability.forwarder.IndexEvent; import com.google.common.flogger.FluentLogger; import com.google.gerrit.entities.Project; @@ -25,6 +26,8 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import dev.failsafe.FailsafeExecutor; +import java.time.Duration; +import java.time.Instant; import java.util.List; import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; @@ -43,89 +46,106 @@ private final MessageDispatcher dispatcher; private final JGroups jgroupsConfig; private final Gson gson; - private final FailsafeExecutor<Boolean> executor; + private final FailsafeExecutor<Result> executor; + private final ForwarderMetricsRegistry metricsRegistry; @Inject JGroupsForwarder( MessageDispatcher dispatcher, Configuration cfg, @JGroupsGson Gson gson, - @JGroupsForwarderExecutor FailsafeExecutor<Boolean> executor) { + @JGroupsForwarderExecutor FailsafeExecutor<Result> executor, + ForwarderMetricsRegistry metricsRegistry) { this.dispatcher = dispatcher; this.jgroupsConfig = cfg.jgroups(); this.gson = gson; this.executor = executor; + + this.metricsRegistry = metricsRegistry; + this.executor.onComplete( + ev -> { + this.metricsRegistry.get(ev.getResult().type()).recordRetries(ev.getAttemptCount()); + }); } @Override - public CompletableFuture<Boolean> indexAccount(int accountId, IndexEvent indexEvent) { - return execute(new IndexAccount(accountId)); + public CompletableFuture<Result> indexAccount(int accountId, IndexEvent indexEvent) { + return execute(new IndexAccount(accountId, indexEvent.eventCreatedOn)); } @Override - public CompletableFuture<Boolean> indexChange( + public CompletableFuture<Result> indexChange( String projectName, int changeId, IndexEvent indexEvent) { - return execute(new IndexChange.Update(projectName, changeId)); + return execute(new IndexChange.Update(projectName, changeId, indexEvent.eventCreatedOn)); } @Override - public CompletableFuture<Boolean> batchIndexChange( + public CompletableFuture<Result> batchIndexChange( String projectName, int changeId, IndexEvent indexEvent) { - return execute(new IndexChange.Update(projectName, changeId, true)); + return execute(new IndexChange.BatchUpdate(projectName, changeId, indexEvent.eventCreatedOn)); } @Override - public CompletableFuture<Boolean> deleteChangeFromIndex(int changeId, IndexEvent indexEvent) { - return execute(new IndexChange.Delete(changeId)); + public CompletableFuture<Result> deleteChangeFromIndex(int changeId, IndexEvent indexEvent) { + return execute(new IndexChange.Delete(changeId, indexEvent.eventCreatedOn)); } @Override - public CompletableFuture<Boolean> indexGroup(String uuid, IndexEvent indexEvent) { - return execute(new IndexGroup(uuid)); + public CompletableFuture<Result> indexGroup(String uuid, IndexEvent indexEvent) { + return execute(new IndexGroup(uuid, indexEvent.eventCreatedOn)); } @Override - public CompletableFuture<Boolean> indexProject(String projectName, IndexEvent indexEvent) { - return execute(new IndexProject(projectName)); + public CompletableFuture<Result> indexProject(String projectName, IndexEvent indexEvent) { + return execute(new IndexProject(projectName, indexEvent.eventCreatedOn)); } @Override - public CompletableFuture<Boolean> send(Event event) { - return execute(new PostEvent(event)); + public CompletableFuture<Result> send(Event event) { + return execute(new PostEvent(event, Instant.ofEpochSecond(event.eventCreatedOn))); } @Override - public CompletableFuture<Boolean> evict(String cacheName, Object key) { - return execute(new EvictCache(cacheName, gson.toJson(key))); + public CompletableFuture<Result> evict(String cacheName, Object key) { + return execute(new EvictCache(cacheName, gson.toJson(key), Instant.now())); } @Override - public CompletableFuture<Boolean> addToProjectList(String projectName) { - return execute(new AddToProjectList(projectName)); + public CompletableFuture<Result> addToProjectList(String projectName) { + return execute(new AddToProjectList(projectName, Instant.now())); } @Override - public CompletableFuture<Boolean> removeFromProjectList(String projectName) { - return execute(new RemoveFromProjectList(projectName)); + public CompletableFuture<Result> removeFromProjectList(String projectName) { + return execute(new RemoveFromProjectList(projectName, Instant.now())); } @Override - public CompletableFuture<Boolean> deleteAllChangesForProject(Project.NameKey projectName) { - return execute(new DeleteAllProjectChangesFromIndex(projectName)); + public CompletableFuture<Result> deleteAllChangesForProject(Project.NameKey projectName) { + return execute(new DeleteAllProjectChangesFromIndex(projectName, Instant.now())); } - private CompletableFuture<Boolean> execute(Command cmd) { - return executor.getAsync(() -> executeOnce(cmd)); + private CompletableFuture<Result> execute(Command cmd) { + return executor + .getAsync(() -> executeOnce(cmd)) + .thenApplyAsync( + result -> { + metricsRegistry.get(cmd.type).recordResult(result.result()); + metricsRegistry + .get(cmd.type) + .recordLatency(Duration.between(cmd.eventCreatedOn, Instant.now()).toMillis()); + return result; + }); } - private boolean executeOnce(Command cmd) { + private Result executeOnce(Command cmd) { String json = gson.toJson(cmd); try { logJGroupsInfo(); if (dispatcher.getChannel().getView().size() < 2) { log.atFine().log("Less than two members in cluster, not sending %s", json); - return false; + return new Result(cmd.type, false); } log.atFine().log("Sending %s", json); @@ -135,7 +155,7 @@ log.atFine().log("Received response list length = %s", list.size()); if (list.isEmpty()) { - return false; + return new Result(cmd.type, false); } for (Entry<Address, Rsp<Object>> e : list.entrySet()) { @@ -144,14 +164,14 @@ log.atWarning().log( "Received a non TRUE response from receiver %s: %s", e.getKey(), e.getValue().getValue()); - return false; + return new Result(cmd.type, false); } } log.atFine().log("Successfully sent message %s", json); - return true; + return new Result(cmd.type, true); } catch (Exception e) { log.atWarning().withCause(e).log("Forwarding %s failed", json); - return false; + return new Result(cmd.type, false); } }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/JGroupsForwarderModule.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/JGroupsForwarderModule.java index 58e7aeb..360ba40 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/JGroupsForwarderModule.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/JGroupsForwarderModule.java
@@ -15,6 +15,7 @@ package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups; import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder; +import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder.Result; import com.ericsson.gerrit.plugins.highavailability.peers.jgroups.JChannelProviderModule; import com.google.gerrit.lifecycle.LifecycleModule; import com.google.gerrit.server.events.EventGson; @@ -24,6 +25,7 @@ import com.google.inject.Singleton; import com.google.inject.TypeLiteral; import dev.failsafe.FailsafeExecutor; +import java.time.Instant; import org.jgroups.blocks.MessageDispatcher; import org.jgroups.blocks.RequestHandler; @@ -37,7 +39,7 @@ install(new JChannelProviderModule()); listener().to(OnStartStop.class); - bind(new TypeLiteral<FailsafeExecutor<Boolean>>() {}) + bind(new TypeLiteral<FailsafeExecutor<Result>>() {}) .annotatedWith(JGroupsForwarderExecutor.class) .toProvider(FailsafeExecutorProvider.class) .in(Scopes.SINGLETON); @@ -50,6 +52,7 @@ return eventGson .newBuilder() .registerTypeAdapter(Command.class, new CommandDeserializer()) + .registerTypeAdapter(Instant.class, new InstantTypeAdapter()) .create(); } }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/MessageProcessor.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/MessageProcessor.java index 73df3eb..489e070 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/MessageProcessor.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/MessageProcessor.java
@@ -23,6 +23,8 @@ import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexChangeHandler; import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexingHandler.Operation; import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedProjectListUpdateHandler; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ProcessorMetrics; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ProcessorMetricsRegistry; import com.google.common.flogger.FluentLogger; import com.google.gerrit.entities.Account; import com.google.gerrit.server.events.Event; @@ -30,6 +32,7 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import java.io.IOException; +import java.time.Instant; import java.util.Optional; import org.jgroups.Message; import org.jgroups.blocks.RequestHandler; @@ -45,6 +48,7 @@ private final ForwardedCacheEvictionHandler cacheEvictionHandler; private final ForwardedEventHandler eventHandler; private final ForwardedProjectListUpdateHandler projectListUpdateHandler; + private final ProcessorMetricsRegistry metricRegistry; @Inject MessageProcessor( @@ -54,7 +58,8 @@ ForwardedIndexAccountHandler indexAccountHandler, ForwardedCacheEvictionHandler cacheEvictionHandler, ForwardedEventHandler eventHandler, - ForwardedProjectListUpdateHandler projectListUpdateHandler) { + ForwardedProjectListUpdateHandler projectListUpdateHandler, + ProcessorMetricsRegistry metricRegistry) { this.gson = gson; this.indexChangeHandler = indexChangeHandler; this.indexBatchChangeHandler = indexBatchChangeHandler; @@ -62,11 +67,15 @@ this.cacheEvictionHandler = cacheEvictionHandler; this.eventHandler = eventHandler; this.projectListUpdateHandler = projectListUpdateHandler; + this.metricRegistry = metricRegistry; } @Override public Object handle(Message msg) { Command cmd = getCommand(msg); + ProcessorMetrics metrics = metricRegistry.get(cmd.type); + Instant startTime = Instant.now(); + boolean success = false; Context.setForwardedEvent(true); try { @@ -83,7 +92,7 @@ } catch (Exception e) { log.atSevere().withCause(e).log( "Change index %s on change %s failed", op.name().toLowerCase(), indexChange.getId()); - return false; + throw e; } } else if (cmd instanceof IndexAccount) { @@ -95,7 +104,7 @@ } catch (IOException e) { log.atSevere().withCause(e).log( "Account index update on account %s failed", indexAccount.getId()); - return false; + throw e; } } else if (cmd instanceof EvictCache) { @@ -117,17 +126,18 @@ String projectName = ((RemoveFromProjectList) cmd).getProjectName(); projectListUpdateHandler.update(projectName, true); } - - return true; + success = true; } catch (Exception e) { - return false; + success = false; } finally { Context.unsetForwardedEvent(); } + metrics.record(cmd.eventCreatedOn, startTime, success); + return success; } private Operation getOperation(IndexChange cmd) { - if (cmd instanceof IndexChange.Update) { + if (cmd instanceof IndexChange.Update || cmd instanceof IndexChange.BatchUpdate) { return Operation.INDEX; } else if (cmd instanceof IndexChange.Delete) { return Operation.DELETE;
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/PostEvent.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/PostEvent.java index 49627a7..3f50b0a 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/PostEvent.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/PostEvent.java
@@ -14,15 +14,17 @@ package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups; +import com.ericsson.gerrit.plugins.highavailability.forwarder.EventType; import com.google.gerrit.server.events.Event; +import java.time.Instant; public class PostEvent extends Command { - static final String TYPE = "post-event"; + static final EventType TYPE = EventType.EVENT_SENT; private final Event event; - protected PostEvent(Event event) { - super(TYPE); + protected PostEvent(Event event, Instant eventCreatedOn) { + super(TYPE, eventCreatedOn); this.event = event; }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/RemoveFromProjectList.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/RemoveFromProjectList.java index 8eb0a3d..1a43ac8 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/RemoveFromProjectList.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/RemoveFromProjectList.java
@@ -14,13 +14,16 @@ package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups; +import com.ericsson.gerrit.plugins.highavailability.forwarder.EventType; +import java.time.Instant; + public class RemoveFromProjectList extends Command { - static final String TYPE = "remove-from-project-list"; + static final EventType TYPE = EventType.PROJECT_LIST_DELETION; private final String projectName; - public RemoveFromProjectList(String projectName) { - super(TYPE); + public RemoveFromProjectList(String projectName, Instant eventCreatedOn) { + super(TYPE, eventCreatedOn); this.projectName = projectName; }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/AbstractIndexRestApiServlet.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/AbstractIndexRestApiServlet.java index a567f61..52b3ece 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/AbstractIndexRestApiServlet.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/AbstractIndexRestApiServlet.java
@@ -18,10 +18,14 @@ import static javax.servlet.http.HttpServletResponse.SC_METHOD_NOT_ALLOWED; import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT; +import com.ericsson.gerrit.plugins.highavailability.forwarder.EventType; import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexingHandler; import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexingHandler.Operation; import com.ericsson.gerrit.plugins.highavailability.forwarder.IndexEvent; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ProcessorMetricsRegistry; import com.google.common.base.Charsets; +import com.google.gerrit.common.Nullable; +import com.google.gerrit.extensions.restapi.NotImplementedException; import com.google.gerrit.server.events.EventGson; import com.google.gson.Gson; import java.io.IOException; @@ -56,45 +60,52 @@ AbstractIndexRestApiServlet( ForwardedIndexingHandler<T> forwardedIndexingHandler, IndexName indexName, - boolean allowDelete, - @EventGson Gson gson) { + @EventGson Gson gson, + ProcessorMetricsRegistry metricsRegistry, + EventType postEventType, + @Nullable EventType deleteEventType) { + super(metricsRegistry, postEventType, deleteEventType); this.forwardedIndexingHandler = forwardedIndexingHandler; this.indexName = indexName; - this.allowDelete = allowDelete; this.gson = gson; - } - - AbstractIndexRestApiServlet( - ForwardedIndexingHandler<T> forwardedIndexingHandler, IndexName indexName) { - this(forwardedIndexingHandler, indexName, false, new Gson()); + this.allowDelete = deleteEventType != null; } @Override - protected void doPost(HttpServletRequest req, HttpServletResponse rsp) { - process(req, rsp, Operation.INDEX); + protected boolean processPostRequest(HttpServletRequest req, HttpServletResponse rsp) { + return process(req, rsp, Operation.INDEX); } @Override - protected void doDelete(HttpServletRequest req, HttpServletResponse rsp) { + protected boolean processDeleteRequest(HttpServletRequest req, HttpServletResponse rsp) { if (!allowDelete) { sendError( rsp, SC_METHOD_NOT_ALLOWED, String.format("cannot delete %s from index", indexName)); - } else { - process(req, rsp, Operation.DELETE); + throw new NotImplementedException("Deletions not allowed for " + indexName); } + return process(req, rsp, Operation.DELETE); } - private void process(HttpServletRequest req, HttpServletResponse rsp, Operation operation) { - setHeaders(rsp); + /** + * Process the request by parsing the ID from the URL and invoking the indexing handler. + * + * @param req the HTTP request + * @param rsp the HTTP response + * @param operation the indexing operation to perform (INDEX or DELETE) + * @return true if the operation was successful, false otherwise + */ + private boolean process(HttpServletRequest req, HttpServletResponse rsp, Operation operation) { String path = req.getRequestURI(); T id = parse(path.substring(path.lastIndexOf('/') + 1)); try { forwardedIndexingHandler.index(id, operation, parseBody(req)); rsp.setStatus(SC_NO_CONTENT); + return true; } catch (IOException e) { sendError(rsp, SC_CONFLICT, e.getMessage()); log.atSevere().withCause(e).log("Unable to update %s index", indexName); + return false; } }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/AbstractRestApiServlet.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/AbstractRestApiServlet.java index a76d8ea..6aa0461 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/AbstractRestApiServlet.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/AbstractRestApiServlet.java
@@ -16,20 +16,88 @@ import static java.nio.charset.StandardCharsets.UTF_8; +import com.ericsson.gerrit.plugins.highavailability.forwarder.EventType; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ProcessorMetrics; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ProcessorMetricsRegistry; +import com.google.common.base.Strings; import com.google.common.flogger.FluentLogger; +import com.google.gerrit.common.Nullable; +import com.google.gerrit.extensions.restapi.NotImplementedException; import java.io.IOException; +import java.time.Instant; import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; public abstract class AbstractRestApiServlet extends HttpServlet { private static final long serialVersionUID = 1L; protected static final FluentLogger log = FluentLogger.forEnclosingClass(); + private final ProcessorMetrics postMetrics; + private final ProcessorMetrics deleteMetrics; + + public AbstractRestApiServlet( + ProcessorMetricsRegistry metricsRegistry, + EventType postEventType, + @Nullable EventType deleteEventType) { + super(); + this.postMetrics = metricsRegistry.get(postEventType); + if (deleteEventType != null) { + this.deleteMetrics = metricsRegistry.get(deleteEventType); + } else { + this.deleteMetrics = null; + } + } + protected static void setHeaders(HttpServletResponse rsp) { rsp.setContentType("text/plain"); rsp.setCharacterEncoding(UTF_8.name()); } + @Override + public final void doPost(HttpServletRequest req, HttpServletResponse rsp) { + setHeaders(rsp); + Instant start = Instant.now(); + + boolean success = processPostRequest(req, rsp); + + postMetrics.record(getEventCreatedOnFromHeader(req), start, success); + } + + @Override + public final void doDelete(HttpServletRequest req, HttpServletResponse rsp) { + setHeaders(rsp); + Instant start = Instant.now(); + + try { + boolean success = processDeleteRequest(req, rsp); + deleteMetrics.record(getEventCreatedOnFromHeader(req), start, success); + } catch (NotImplementedException e) { + return; + } + } + + private Instant getEventCreatedOnFromHeader(HttpServletRequest req) { + String header = req.getHeader(HttpSession.HEADER_EVENT_CREATED_ON); + if (!Strings.isNullOrEmpty(header)) { + try { + return Instant.ofEpochMilli(Long.valueOf(header)); + } catch (NumberFormatException e) { + log.atWarning().withCause(e).log( + "Invalid value for header %s: %s", HttpSession.HEADER_EVENT_CREATED_ON, header); + } + } + return null; + } + + protected boolean processPostRequest(HttpServletRequest req, HttpServletResponse rsp) { + throw new NotImplementedException("POST requests not implemented"); + } + + protected boolean processDeleteRequest(HttpServletRequest req, HttpServletResponse rsp) { + throw new NotImplementedException("DELETE requests not implemented"); + } + protected void sendError(HttpServletResponse rsp, int statusCode, String message) { try { rsp.sendError(statusCode, message); @@ -37,4 +105,19 @@ log.atSevere().withCause(e).log("Failed to send error messsage"); } } + + protected static void updateMetrics( + ProcessorMetrics metrics, HttpServletRequest req, Instant startTime, boolean success) { + String eventCreatedOn = req.getHeader(HttpSession.HEADER_EVENT_CREATED_ON); + Instant now = Instant.now(); + long totalDuration; + if (Strings.isNullOrEmpty(eventCreatedOn)) { + totalDuration = 0L; + } else { + totalDuration = now.toEpochMilli() - Long.valueOf(eventCreatedOn); + } + metrics.recordResult(success); + metrics.recordProcessingTime(now.toEpochMilli() - startTime.toEpochMilli()); + metrics.recordTotalTime(totalDuration); + } }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/CacheKeyJsonParser.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/CacheKeyJsonParser.java index b8f5e9c..16e25a9 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/CacheKeyJsonParser.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/CacheKeyJsonParser.java
@@ -15,6 +15,7 @@ package com.ericsson.gerrit.plugins.highavailability.forwarder.rest; import com.ericsson.gerrit.plugins.highavailability.cache.Constants; +import com.google.common.base.CharMatcher; import com.google.common.base.Strings; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; @@ -54,7 +55,7 @@ case Constants.PROJECT_LIST: return gson.fromJson(json, Object.class); case Constants.PROJECTS: - return Project.nameKey(json.getAsString()); + return Project.nameKey(CharMatcher.is('\"').trimFrom(json.getAsString())); default: try { return gson.fromJson(json, String.class);
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/CacheRestApiServlet.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/CacheRestApiServlet.java index 86d64d2..926776b 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/CacheRestApiServlet.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/CacheRestApiServlet.java
@@ -19,7 +19,9 @@ import com.ericsson.gerrit.plugins.highavailability.forwarder.CacheEntry; import com.ericsson.gerrit.plugins.highavailability.forwarder.CacheNotFoundException; +import com.ericsson.gerrit.plugins.highavailability.forwarder.EventType; import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedCacheEvictionHandler; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ProcessorMetricsRegistry; import com.google.common.base.Splitter; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -39,13 +41,15 @@ @Inject CacheRestApiServlet( ForwardedCacheEvictionHandler forwardedCacheEvictionHandler, - CacheKeyJsonParser cacheKeyParser) { + CacheKeyJsonParser cacheKeyParser, + ProcessorMetricsRegistry metricRegistry) { + super(metricRegistry, EventType.CACHE_EVICTION, null); this.forwardedCacheEvictionHandler = forwardedCacheEvictionHandler; this.cacheKeyParser = cacheKeyParser; } @Override - protected void doPost(HttpServletRequest req, HttpServletResponse rsp) { + protected boolean processPostRequest(HttpServletRequest req, HttpServletResponse rsp) { setHeaders(rsp); try { List<String> params = Splitter.on('/').splitToList(req.getPathInfo()); @@ -54,6 +58,7 @@ forwardedCacheEvictionHandler.evict( CacheEntry.from(cacheName, cacheKeyParser.fromJson(cacheName, json))); rsp.setStatus(SC_NO_CONTENT); + return true; } catch (CacheNotFoundException e) { log.atSevere().log("Failed to process eviction request: %s", e.getMessage()); sendError(rsp, SC_BAD_REQUEST, e.getMessage()); @@ -61,5 +66,6 @@ log.atSevere().withCause(e).log("Failed to process eviction request"); sendError(rsp, SC_BAD_REQUEST, e.getMessage()); } + return false; } }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/EventRestApiServlet.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/EventRestApiServlet.java index e2e3302..973844d 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/EventRestApiServlet.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/EventRestApiServlet.java
@@ -19,7 +19,9 @@ import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT; import static javax.servlet.http.HttpServletResponse.SC_UNSUPPORTED_MEDIA_TYPE; +import com.ericsson.gerrit.plugins.highavailability.forwarder.EventType; import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedEventHandler; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ProcessorMetricsRegistry; import com.google.common.io.CharStreams; import com.google.common.net.MediaType; import com.google.gerrit.server.events.Event; @@ -39,24 +41,29 @@ private final Gson gson; @Inject - EventRestApiServlet(ForwardedEventHandler forwardedEventHandler, @EventGson Gson gson) { + EventRestApiServlet( + ForwardedEventHandler forwardedEventHandler, + @EventGson Gson gson, + ProcessorMetricsRegistry metricRegistry) { + super(metricRegistry, EventType.EVENT_SENT, null); this.forwardedEventHandler = forwardedEventHandler; this.gson = gson; } @Override - protected void doPost(HttpServletRequest req, HttpServletResponse rsp) { - setHeaders(rsp); + protected boolean processPostRequest(HttpServletRequest req, HttpServletResponse rsp) { try { if (!MediaType.parse(req.getContentType()).is(JSON_UTF_8)) { sendError(rsp, SC_UNSUPPORTED_MEDIA_TYPE, "Expecting " + JSON_UTF_8 + " content type"); - return; + return false; } Event event = getEventFromRequest(req); rsp.setStatus(SC_NO_CONTENT); forwardedEventHandler.dispatch(event); + return true; } catch (IOException e) { sendError(rsp, SC_BAD_REQUEST, e.getMessage()); + return false; } }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/FailsafeExecutorProvider.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/FailsafeExecutorProvider.java index 8134220..6168880 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/FailsafeExecutorProvider.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/FailsafeExecutorProvider.java
@@ -15,18 +15,18 @@ package com.ericsson.gerrit.plugins.highavailability.forwarder.rest; import com.ericsson.gerrit.plugins.highavailability.Configuration; +import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder.Result; import com.google.common.flogger.FluentLogger; import com.google.inject.Inject; import com.google.inject.Provider; import com.google.inject.Singleton; import dev.failsafe.Failsafe; import dev.failsafe.FailsafeExecutor; -import dev.failsafe.Fallback; import dev.failsafe.RetryPolicy; import java.util.concurrent.Executors; @Singleton -public class FailsafeExecutorProvider implements Provider<FailsafeExecutor<Boolean>> { +public class FailsafeExecutorProvider implements Provider<FailsafeExecutor<Result>> { private static final FluentLogger log = FluentLogger.forEnclosingClass(); private final Configuration cfg; @@ -36,10 +36,9 @@ } @Override - public FailsafeExecutor<Boolean> get() { - Fallback<Boolean> fallbackToFalse = Fallback.<Boolean>of(() -> false); - RetryPolicy<Boolean> retryPolicy = - RetryPolicy.<Boolean>builder() + public FailsafeExecutor<Result> get() { + RetryPolicy<Result> retryPolicy = + RetryPolicy.<Result>builder() .withMaxAttempts(cfg.http().maxTries()) .withDelay(cfg.http().retryInterval()) .onRetry(e -> log.atFine().log("Retrying event %s", e)) @@ -47,15 +46,13 @@ e -> log.atWarning().log( "%d http retries exceeded for event %s", cfg.http().maxTries(), e)) - .handleResult(false) - .abortIf( - (r, e) -> - e instanceof ForwardingException && !((ForwardingException) e).isRecoverable()) + .handleResultIf(r -> !r.result()) + .abortIf((r, e) -> !r.result() && !r.isRecoverable()) .build(); // TODO: the executor shall be created by workQueue.createQueue(...) // However, this currently doesn't work because WorkQueue.Executor doesn't support wrapping of // Callable i.e. it throws an exception on decorateTask(Callable) - return Failsafe.with(fallbackToFalse, retryPolicy) + return Failsafe.with(retryPolicy) .with(Executors.newScheduledThreadPool(cfg.http().threadPoolSize())); } }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpSession.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpSession.java index 6d801f6..53b8dc7 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpSession.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpSession.java
@@ -14,6 +14,7 @@ package com.ericsson.gerrit.plugins.highavailability.forwarder.rest; +import com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups.InstantTypeAdapter; import com.ericsson.gerrit.plugins.highavailability.forwarder.rest.HttpResponseHandler.HttpResult; import com.google.common.net.MediaType; import com.google.gerrit.server.events.EventGson; @@ -22,6 +23,7 @@ import java.io.IOException; import java.net.URI; import java.nio.charset.StandardCharsets; +import java.time.Instant; import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpEntityEnclosingRequestBase; import org.apache.http.client.methods.HttpPost; @@ -29,40 +31,46 @@ import org.apache.http.impl.client.CloseableHttpClient; class HttpSession { + public static final String HEADER_EVENT_CREATED_ON = "Event-Created-On"; + private final CloseableHttpClient httpClient; private final Gson gson; @Inject HttpSession(CloseableHttpClient httpClient, @EventGson Gson gson) { this.httpClient = httpClient; - this.gson = gson; + + this.gson = + gson.newBuilder().registerTypeAdapter(Instant.class, new InstantTypeAdapter()).create(); } - HttpResult post(String uri) throws IOException { - return post(uri, null); + HttpResult post(String uri, Instant createdOn) throws IOException { + return post(uri, null, createdOn); } - HttpResult post(String uri, Object content) throws IOException { + HttpResult post(String uri, Object content, Instant createdOn) throws IOException { HttpPost post = new HttpPost(uri); - setContent(post, content); + setContent(post, content, createdOn); return httpClient.execute(post, new HttpResponseHandler()); } - HttpResult delete(String uri) throws IOException { - return delete(uri, null); + HttpResult delete(String uri, Instant createdOn) throws IOException { + return delete(uri, null, createdOn); } - HttpResult delete(String uri, Object content) throws IOException { + HttpResult delete(String uri, Object content, Instant createdOn) throws IOException { HttpDeleteWithBody delete = new HttpDeleteWithBody(uri); - setContent(delete, content); + setContent(delete, content, createdOn); return httpClient.execute(delete, new HttpResponseHandler()); } - private void setContent(HttpEntityEnclosingRequestBase request, Object content) { + private void setContent( + HttpEntityEnclosingRequestBase request, Object content, Instant createdOn) { if (content != null) { request.addHeader("Content-Type", MediaType.JSON_UTF_8.toString()); request.setEntity(new StringEntity(jsonEncode(content), StandardCharsets.UTF_8)); } + request.addHeader(HEADER_EVENT_CREATED_ON, String.valueOf(createdOn.toEpochMilli())); } private String jsonEncode(Object content) {
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexAccountRestApiServlet.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexAccountRestApiServlet.java index fa38cc3..047da64 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexAccountRestApiServlet.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexAccountRestApiServlet.java
@@ -14,8 +14,12 @@ package com.ericsson.gerrit.plugins.highavailability.forwarder.rest; +import com.ericsson.gerrit.plugins.highavailability.forwarder.EventType; import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexAccountHandler; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ProcessorMetricsRegistry; import com.google.gerrit.entities.Account; +import com.google.gerrit.server.events.EventGson; +import com.google.gson.Gson; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -24,8 +28,11 @@ private static final long serialVersionUID = -1L; @Inject - IndexAccountRestApiServlet(ForwardedIndexAccountHandler handler) { - super(handler, IndexName.ACCOUNT); + IndexAccountRestApiServlet( + ForwardedIndexAccountHandler handler, + @EventGson Gson gson, + ProcessorMetricsRegistry metricRegistry) { + super(handler, IndexName.ACCOUNT, gson, metricRegistry, EventType.INDEX_ACCOUNT_UPDATE, null); } @Override
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexBatchChangeRestApiServlet.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexBatchChangeRestApiServlet.java index bebfae9..5816cb5 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexBatchChangeRestApiServlet.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexBatchChangeRestApiServlet.java
@@ -14,7 +14,9 @@ package com.ericsson.gerrit.plugins.highavailability.forwarder.rest; +import com.ericsson.gerrit.plugins.highavailability.forwarder.EventType; import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexBatchChangeHandler; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ProcessorMetricsRegistry; import com.google.gerrit.extensions.restapi.Url; import com.google.gerrit.server.events.EventGson; import com.google.gson.Gson; @@ -26,8 +28,12 @@ private static final long serialVersionUID = -1L; @Inject - IndexBatchChangeRestApiServlet(ForwardedIndexBatchChangeHandler handler, @EventGson Gson gson) { - super(handler, IndexName.CHANGE, true, gson); + IndexBatchChangeRestApiServlet( + ForwardedIndexBatchChangeHandler handler, + @EventGson Gson gson, + ProcessorMetricsRegistry metricRegistry) { + super( + handler, IndexName.CHANGE, gson, metricRegistry, EventType.INDEX_CHANGE_UPDATE_BATCH, null); } @Override
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexChangeRestApiServlet.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexChangeRestApiServlet.java index 71e10fb..4d8d9a3 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexChangeRestApiServlet.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexChangeRestApiServlet.java
@@ -14,7 +14,9 @@ package com.ericsson.gerrit.plugins.highavailability.forwarder.rest; +import com.ericsson.gerrit.plugins.highavailability.forwarder.EventType; import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexChangeHandler; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ProcessorMetricsRegistry; import com.google.gerrit.extensions.restapi.Url; import com.google.gerrit.server.events.EventGson; import com.google.gson.Gson; @@ -26,8 +28,17 @@ private static final long serialVersionUID = -1L; @Inject - IndexChangeRestApiServlet(ForwardedIndexChangeHandler handler, @EventGson Gson gson) { - super(handler, IndexName.CHANGE, true, gson); + IndexChangeRestApiServlet( + ForwardedIndexChangeHandler handler, + @EventGson Gson gson, + ProcessorMetricsRegistry metricRegistry) { + super( + handler, + IndexName.CHANGE, + gson, + metricRegistry, + EventType.INDEX_CHANGE_UPDATE, + EventType.INDEX_CHANGE_DELETION); } @Override
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexGroupRestApiServlet.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexGroupRestApiServlet.java index 35c526f..ed9b57e 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexGroupRestApiServlet.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexGroupRestApiServlet.java
@@ -14,8 +14,12 @@ package com.ericsson.gerrit.plugins.highavailability.forwarder.rest; +import com.ericsson.gerrit.plugins.highavailability.forwarder.EventType; import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexGroupHandler; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ProcessorMetricsRegistry; import com.google.gerrit.entities.AccountGroup; +import com.google.gerrit.server.events.EventGson; +import com.google.gson.Gson; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -24,8 +28,11 @@ private static final long serialVersionUID = -1L; @Inject - IndexGroupRestApiServlet(ForwardedIndexGroupHandler handler) { - super(handler, IndexName.GROUP); + IndexGroupRestApiServlet( + ForwardedIndexGroupHandler handler, + @EventGson Gson gson, + ProcessorMetricsRegistry metricRegistry) { + super(handler, IndexName.GROUP, gson, metricRegistry, EventType.INDEX_GROUP_UPDATE, null); } @Override
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexProjectRestApiServlet.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexProjectRestApiServlet.java index 31df2bb..fbf7ffd 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexProjectRestApiServlet.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexProjectRestApiServlet.java
@@ -14,9 +14,13 @@ package com.ericsson.gerrit.plugins.highavailability.forwarder.rest; +import com.ericsson.gerrit.plugins.highavailability.forwarder.EventType; import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexProjectHandler; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ProcessorMetricsRegistry; import com.google.gerrit.entities.Project; import com.google.gerrit.extensions.restapi.Url; +import com.google.gerrit.server.events.EventGson; +import com.google.gson.Gson; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -25,8 +29,11 @@ private static final long serialVersionUID = -1L; @Inject - IndexProjectRestApiServlet(ForwardedIndexProjectHandler handler) { - super(handler, IndexName.PROJECT); + IndexProjectRestApiServlet( + ForwardedIndexProjectHandler handler, + @EventGson Gson gson, + ProcessorMetricsRegistry metricRegistry) { + super(handler, IndexName.PROJECT, gson, metricRegistry, EventType.INDEX_PROJECT_UPDATE, null); } @Override
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/ProjectListApiServlet.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/ProjectListApiServlet.java index 7a31ea0..fb88edb 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/ProjectListApiServlet.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/ProjectListApiServlet.java
@@ -17,7 +17,9 @@ import static javax.servlet.http.HttpServletResponse.SC_BAD_REQUEST; import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT; +import com.ericsson.gerrit.plugins.highavailability.forwarder.EventType; import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedProjectListUpdateHandler; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ProcessorMetricsRegistry; import com.google.gerrit.extensions.restapi.Url; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -32,30 +34,34 @@ private final ForwardedProjectListUpdateHandler forwardedProjectListUpdateHandler; @Inject - ProjectListApiServlet(ForwardedProjectListUpdateHandler forwardedProjectListUpdateHandler) { + ProjectListApiServlet( + ForwardedProjectListUpdateHandler forwardedProjectListUpdateHandler, + ProcessorMetricsRegistry metricRegistry) { + super(metricRegistry, EventType.PROJECT_LIST_ADDITION, EventType.PROJECT_LIST_DELETION); this.forwardedProjectListUpdateHandler = forwardedProjectListUpdateHandler; } @Override - protected void doPost(HttpServletRequest req, HttpServletResponse rsp) { - process(req, rsp, false); + protected boolean processPostRequest(HttpServletRequest req, HttpServletResponse rsp) { + return process(req, rsp, false); } @Override - protected void doDelete(HttpServletRequest req, HttpServletResponse rsp) { - process(req, rsp, true); + protected boolean processDeleteRequest(HttpServletRequest req, HttpServletResponse rsp) { + return process(req, rsp, true); } - private void process(HttpServletRequest req, HttpServletResponse rsp, boolean delete) { - setHeaders(rsp); + private boolean process(HttpServletRequest req, HttpServletResponse rsp, boolean delete) { String requestURI = req.getRequestURI(); String projectName = requestURI.substring(requestURI.lastIndexOf('/') + 1); try { forwardedProjectListUpdateHandler.update(Url.decode(projectName), delete); rsp.setStatus(SC_NO_CONTENT); + return true; } catch (IOException e) { log.atSevere().withCause(e).log("Unable to update project list"); sendError(rsp, SC_BAD_REQUEST, e.getMessage()); + return false; } } }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/QueryChangesUpdatedSinceServlet.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/QueryChangesUpdatedSinceServlet.java index 4ebf175..244831e 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/QueryChangesUpdatedSinceServlet.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/QueryChangesUpdatedSinceServlet.java
@@ -32,11 +32,12 @@ import java.io.PrintWriter; import java.util.ArrayList; import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @Singleton -public class QueryChangesUpdatedSinceServlet extends AbstractRestApiServlet { +public class QueryChangesUpdatedSinceServlet extends HttpServlet { private static final long serialVersionUID = 1L; Gson gson = new Gson();
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java index d53cd82..b7ad165 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarder.java
@@ -16,7 +16,9 @@ import com.ericsson.gerrit.plugins.highavailability.Configuration; import com.ericsson.gerrit.plugins.highavailability.cache.Constants; +import com.ericsson.gerrit.plugins.highavailability.forwarder.EventType; import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwarderMetricsRegistry; import com.ericsson.gerrit.plugins.highavailability.forwarder.IndexEvent; import com.ericsson.gerrit.plugins.highavailability.forwarder.rest.HttpResponseHandler.HttpResult; import com.ericsson.gerrit.plugins.highavailability.peers.PeerInfo; @@ -33,6 +35,8 @@ import com.google.inject.Provider; import dev.failsafe.FailsafeExecutor; import java.io.IOException; +import java.time.Duration; +import java.time.Instant; import java.util.Set; import java.util.concurrent.CompletableFuture; import javax.net.ssl.SSLException; @@ -54,7 +58,8 @@ private final Configuration cfg; private final Provider<Set<PeerInfo>> peerInfoProvider; private final Gson gson; - private FailsafeExecutor<Boolean> executor; + private FailsafeExecutor<Result> executor; + private final ForwarderMetricsRegistry metricsRegistry; @Inject RestForwarder( @@ -63,51 +68,80 @@ Configuration cfg, Provider<Set<PeerInfo>> peerInfoProvider, @EventGson Gson gson, - @RestForwarderExecutor FailsafeExecutor<Boolean> executor) { + @RestForwarderExecutor FailsafeExecutor<Result> executor, + ForwarderMetricsRegistry metricsRegistry) { this.httpSession = httpClient; this.pluginRelativePath = Joiner.on("/").join("plugins", pluginName); this.cfg = cfg; this.peerInfoProvider = peerInfoProvider; this.gson = gson; this.executor = executor; + this.metricsRegistry = metricsRegistry; + this.executor.onComplete( + ev -> { + this.metricsRegistry.get(ev.getResult().type()).recordRetries(ev.getAttemptCount()); + }); } @Override - public CompletableFuture<Boolean> indexAccount(final int accountId, IndexEvent event) { - return execute(RequestMethod.POST, "index account", "index/account", accountId, event); - } - - @Override - public CompletableFuture<Boolean> indexChange( - String projectName, int changeId, IndexEvent event) { + public CompletableFuture<Result> indexAccount(final int accountId, IndexEvent event) { return execute( RequestMethod.POST, + EventType.INDEX_ACCOUNT_UPDATE, + "index account", + "index/account", + accountId, + event, + event.eventCreatedOn); + } + + @Override + public CompletableFuture<Result> indexChange(String projectName, int changeId, IndexEvent event) { + return execute( + RequestMethod.POST, + EventType.INDEX_CHANGE_UPDATE, "index change", "index/change", buildIndexEndpoint(projectName, changeId), - event); + event, + event.eventCreatedOn); } @Override - public CompletableFuture<Boolean> batchIndexChange( + public CompletableFuture<Result> batchIndexChange( String projectName, int changeId, IndexEvent event) { return execute( RequestMethod.POST, + EventType.INDEX_CHANGE_UPDATE_BATCH, "index change", "index/change/batch", buildIndexEndpoint(projectName, changeId), - event); + event, + event.eventCreatedOn); } @Override - public CompletableFuture<Boolean> deleteChangeFromIndex(final int changeId, IndexEvent event) { + public CompletableFuture<Result> deleteChangeFromIndex(final int changeId, IndexEvent event) { return execute( - RequestMethod.DELETE, "delete change", "index/change", buildIndexEndpoint(changeId), event); + RequestMethod.DELETE, + EventType.INDEX_CHANGE_DELETION, + "delete change", + "index/change", + buildIndexEndpoint(changeId), + event, + event.eventCreatedOn); } @Override - public CompletableFuture<Boolean> indexGroup(final String uuid, IndexEvent event) { - return execute(RequestMethod.POST, "index group", "index/group", uuid, event); + public CompletableFuture<Result> indexGroup(final String uuid, IndexEvent event) { + return execute( + RequestMethod.POST, + EventType.INDEX_GROUP_UPDATE, + "index group", + "index/group", + uuid, + event, + event.eventCreatedOn); } private String buildIndexEndpoint(int changeId) { @@ -126,99 +160,153 @@ } @Override - public CompletableFuture<Boolean> indexProject(String projectName, IndexEvent event) { - return execute( - RequestMethod.POST, "index project", "index/project", Url.encode(projectName), event); - } - - @Override - public CompletableFuture<Boolean> send(final Event event) { - return execute(RequestMethod.POST, "send event", "event", event.type, event); - } - - @Override - public CompletableFuture<Boolean> evict(final String cacheName, final Object key) { - String json = gson.toJson(key); - return execute(RequestMethod.POST, "invalidate cache " + cacheName, "cache", cacheName, json); - } - - @Override - public CompletableFuture<Boolean> addToProjectList(String projectName) { + public CompletableFuture<Result> indexProject(String projectName, IndexEvent event) { return execute( RequestMethod.POST, + EventType.INDEX_PROJECT_UPDATE, + "index project", + "index/project", + Url.encode(projectName), + event, + event.eventCreatedOn); + } + + @Override + public CompletableFuture<Result> send(final Event event) { + return execute( + RequestMethod.POST, + EventType.EVENT_SENT, + "send event", + "event", + event.type, + event, + Instant.ofEpochSecond(event.eventCreatedOn)); + } + + @Override + public CompletableFuture<Result> evict(final String cacheName, final Object key) { + String json = gson.toJson(key); + return execute( + RequestMethod.POST, + EventType.CACHE_EVICTION, + "invalidate cache " + cacheName, + "cache", + cacheName, + json, + Instant.now()); + } + + @Override + public CompletableFuture<Result> addToProjectList(String projectName) { + return execute( + RequestMethod.POST, + EventType.PROJECT_LIST_ADDITION, "Update project_list, add ", buildProjectListEndpoint(), - Url.encode(projectName)); + Url.encode(projectName), + Instant.now()); } @Override - public CompletableFuture<Boolean> removeFromProjectList(String projectName) { + public CompletableFuture<Result> removeFromProjectList(String projectName) { return execute( RequestMethod.DELETE, + EventType.PROJECT_LIST_DELETION, "Update project_list, remove ", buildProjectListEndpoint(), - Url.encode(projectName)); + Url.encode(projectName), + Instant.now()); } @Override - public CompletableFuture<Boolean> deleteAllChangesForProject(Project.NameKey projectName) { + public CompletableFuture<Result> deleteAllChangesForProject(Project.NameKey projectName) { return execute( RequestMethod.DELETE, + EventType.INDEX_CHANGE_DELETION_ALL_OF_PROJECT, "Delete all project changes from index", "index/change", - buildAllChangesForProjectEndpoint(projectName.get())); + buildAllChangesForProjectEndpoint(projectName.get()), + Instant.now()); } private static String buildProjectListEndpoint() { return Joiner.on("/").join("cache", Constants.PROJECT_LIST); } - private CompletableFuture<Boolean> execute( - RequestMethod method, String action, String endpoint, Object id) { - return execute(method, action, endpoint, id, null); + private CompletableFuture<Result> execute( + RequestMethod method, + EventType eventType, + String action, + String endpoint, + Object id, + Instant requestStart) { + return execute(method, eventType, action, endpoint, id, null, requestStart); } - private CompletableFuture<Boolean> execute( - RequestMethod method, String action, String endpoint, Object id, Object payload) { + private CompletableFuture<Result> execute( + RequestMethod method, + EventType eventType, + String action, + String endpoint, + Object id, + Object payload, + Instant requestStart) { + log.atFine().log("Scheduling forwarding of: %s %s %s", action, id, payload); return peerInfoProvider.get().stream() - .map(peer -> createRequest(method, peer, action, endpoint, id, payload)) + .map( + peer -> + createRequest(method, eventType, peer, action, endpoint, id, payload, requestStart)) .map(r -> executor.getAsync(() -> r.execute())) .reduce( - CompletableFuture.completedFuture(true), - (a, b) -> a.thenCombine(b, (left, right) -> left && right)); + CompletableFuture.completedFuture(new Result(eventType, true)), + (a, b) -> + a.thenCombine( + b, (left, right) -> new Result(eventType, left.result() && right.result()))) + .thenApplyAsync( + result -> { + metricsRegistry.get(eventType).recordResult(result.result()); + metricsRegistry + .get(eventType) + .recordLatency(Duration.between(requestStart, Instant.now()).toMillis()); + return result; + }); } private Request createRequest( RequestMethod method, + EventType eventType, PeerInfo peer, String action, String endpoint, Object id, - Object payload) { + Object payload, + Instant createdOn) { String destination = peer.getDirectUrl(); - return new Request(action, id, destination) { + return new Request(eventType, action, id, destination) { @Override HttpResult send() throws IOException { String request = Joiner.on("/").join(destination, pluginRelativePath, endpoint, id); switch (method) { case POST: - return httpSession.post(request, payload); + return httpSession.post(request, payload, createdOn); case DELETE: default: - return httpSession.delete(request); + return httpSession.delete(request, createdOn); } } }; } protected abstract class Request { + private final EventType eventType; private final String action; private final Object key; private final String destination; private int execCnt; - Request(String action, Object key, String destination) { + Request(EventType eventType, String action, Object key, String destination) { + this.eventType = eventType; this.action = action; this.key = key; this.destination = destination; @@ -229,13 +317,13 @@ return String.format("%s:%s => %s (try #%d)", action, key, destination, execCnt); } - boolean execute() throws ForwardingException { + Result execute() { log.atFine().log("Executing %s %s towards %s", action, key, destination); try { execCnt++; tryOnce(); log.atFine().log("%s %s towards %s OK", action, key, destination); - return true; + return new Result(eventType, true); } catch (ForwardingException e) { int maxTries = cfg.http().maxTries(); log.atFine().withCause(e).log( @@ -244,10 +332,10 @@ log.atSevere().withCause(e).log( "%s %s towards %s failed with unrecoverable error; giving up", action, key, destination); - throw e; + return new Result(eventType, false, false); } } - return false; + return new Result(eventType, false); } void tryOnce() throws ForwardingException {
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderModule.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderModule.java index 39c9cc0..5d38b16 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderModule.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderModule.java
@@ -15,6 +15,7 @@ package com.ericsson.gerrit.plugins.highavailability.forwarder.rest; import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder; +import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder.Result; import com.google.inject.AbstractModule; import com.google.inject.Scopes; import com.google.inject.TypeLiteral; @@ -29,7 +30,7 @@ bind(HttpSession.class); bind(Forwarder.class).to(RestForwarder.class); - bind(new TypeLiteral<FailsafeExecutor<Boolean>>() {}) + bind(new TypeLiteral<FailsafeExecutor<Result>>() {}) .annotatedWith(RestForwarderExecutor.class) .toProvider(FailsafeExecutorProvider.class) .in(Scopes.SINGLETON);
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeChecker.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeChecker.java index 0ed6160..f702d87 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeChecker.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeChecker.java
@@ -17,6 +17,7 @@ import com.ericsson.gerrit.plugins.highavailability.forwarder.IndexEvent; import com.google.gerrit.server.notedb.ChangeNotes; import java.io.IOException; +import java.time.Instant; import java.util.Optional; /** Encapsulates the logic of verifying the up-to-date status of a change. */ @@ -51,8 +52,8 @@ * * <p>Compute the up-to-date Change time-stamp when it is invoked for the very first time. * - * @return the Change timestamp epoch in seconds + * @return the Change timestamp instant * @throws IOException if an I/O error occurred while reading the local Change */ - Optional<Long> getComputedChangeTs() throws IOException; + Optional<Instant> getComputedChangeTs() throws IOException; }
diff --git a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeCheckerImpl.java b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeCheckerImpl.java index a392935..6156fb5 100644 --- a/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeCheckerImpl.java +++ b/src/main/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeCheckerImpl.java
@@ -26,6 +26,7 @@ import com.google.inject.Inject; import com.google.inject.assistedinject.Assisted; import java.io.IOException; +import java.time.Instant; import java.util.Objects; import java.util.Optional; import org.eclipse.jgit.lib.ObjectId; @@ -38,7 +39,7 @@ private final OneOffRequestContext oneOffReqCtx; private final String changeId; private final ChangeFinder changeFinder; - private Optional<Long> computedChangeTs = Optional.empty(); + private Optional<Instant> computedChangeTs = Optional.empty(); private Optional<ChangeNotes> changeNotes = Optional.empty(); public interface Factory { @@ -59,12 +60,12 @@ @Override public Optional<IndexEvent> newIndexEvent() throws IOException { - Optional<Long> changeTs = getComputedChangeTs(); + Optional<Instant> changeTs = getComputedChangeTs(); if (!changeTs.isPresent()) { return Optional.empty(); } - long ts = changeTs.get(); + Instant ts = changeTs.get(); IndexEvent event = new IndexEvent(); event.eventCreatedOn = ts; @@ -99,12 +100,9 @@ if (indexEventOption.isPresent()) { try (Repository repo = gitRepoMgr.openRepository(changeNotes.get().getProjectName())) { IndexEvent indexEvent = indexEventOption.get(); - return (computedChangeTs.get() > indexEvent.eventCreatedOn) - || (computedChangeTs.get() == indexEvent.eventCreatedOn) - && (Objects.isNull(indexEvent.targetSha) - || repositoryHas(repo, indexEvent.targetSha)) - && (Objects.isNull(indexEvent.targetSha) - || repositoryHas(repo, indexEvent.metaSha)); + return computedChangeTs.get().compareTo(indexEvent.eventCreatedOn) >= 0 + && (Objects.isNull(indexEvent.targetSha) || repositoryHas(repo, indexEvent.targetSha)) + && (Objects.isNull(indexEvent.targetSha) || repositoryHas(repo, indexEvent.metaSha)); } } return true; @@ -116,7 +114,7 @@ } @Override - public Optional<Long> getComputedChangeTs() { + public Optional<Instant> getComputedChangeTs() { if (!computedChangeTs.isPresent()) { computedChangeTs = computeLastChangeTs(); } @@ -166,7 +164,7 @@ } } - private Optional<Long> computeLastChangeTs() { + private Optional<Instant> computeLastChangeTs() { return getChangeNotes().map(this::getTsFromChange); } @@ -180,8 +178,8 @@ return ref.getTarget().getObjectId().getName(); } - private long getTsFromChange(ChangeNotes notes) { + private Instant getTsFromChange(ChangeNotes notes) { Change change = notes.getChange(); - return change.getLastUpdatedOn().toEpochMilli() / 1000; + return change.getLastUpdatedOn(); } }
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/CommandDeserializerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/CommandDeserializerTest.java index 2f5c5b7..7829a80 100644 --- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/CommandDeserializerTest.java +++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/CommandDeserializerTest.java
@@ -17,6 +17,7 @@ import static com.google.common.truth.Truth.assertThat; import com.ericsson.gerrit.plugins.highavailability.cache.Constants; +import com.ericsson.gerrit.plugins.highavailability.forwarder.EventType; import com.ericsson.gerrit.plugins.highavailability.forwarder.rest.CacheKeyJsonParser; import com.google.gerrit.entities.Project; import com.google.gerrit.server.events.Event; @@ -40,7 +41,9 @@ @Test public void indexAccount() { - Command cmd = gson.fromJson("{type: 'index-account', id: 100}", Command.class); + Command cmd = + gson.fromJson( + String.format("{type: '%s', id: 100}", EventType.INDEX_ACCOUNT_UPDATE), Command.class); assertThat(cmd).isInstanceOf(IndexAccount.class); IndexAccount index = (IndexAccount) cmd; assertThat(index.getId()).isEqualTo(100); @@ -49,7 +52,10 @@ @Test public void updateChangeCommand() { Command cmd = - gson.fromJson("{type: 'update-change', projectName: 'foo', id: 100}", Command.class); + gson.fromJson( + String.format( + "{type: '%s', projectName: 'foo', id: 100}", EventType.INDEX_CHANGE_UPDATE), + Command.class); assertThat(cmd).isInstanceOf(IndexChange.Update.class); IndexChange.Update update = (IndexChange.Update) cmd; assertThat(update.getId()).isEqualTo("foo~100"); @@ -60,22 +66,30 @@ public void batchUpdateChangeCommand() { Command cmd = gson.fromJson( - "{type: 'update-change', projectName: 'foo', id: 100, batchMode: 'true'}", + String.format( + "{type: '%s', projectName: 'foo', id: 100, batchMode: 'true'}", + EventType.INDEX_CHANGE_UPDATE_BATCH), Command.class); - assertThat(cmd).isInstanceOf(IndexChange.Update.class); - IndexChange.Update update = (IndexChange.Update) cmd; + assertThat(cmd).isInstanceOf(IndexChange.BatchUpdate.class); + IndexChange.BatchUpdate update = (IndexChange.BatchUpdate) cmd; assertThat(update.getId()).isEqualTo("foo~100"); assertThat(update.isBatch()).isTrue(); } @Test public void deleteChangeCommand() { - Command cmd = gson.fromJson("{type: 'delete-change', id: 100}", Command.class); + Command cmd = + gson.fromJson( + String.format("{type: '%s', id: 100}", EventType.INDEX_CHANGE_DELETION), Command.class); assertThat(cmd).isInstanceOf(IndexChange.Delete.class); IndexChange.Delete delete = (IndexChange.Delete) cmd; assertThat(delete.getId()).isEqualTo("~100"); - cmd = gson.fromJson("{type: 'delete-change', projectName: 'foo', id: 100}", Command.class); + cmd = + gson.fromJson( + String.format( + "{type: '%s', projectName: 'foo', id: 100}", EventType.INDEX_CHANGE_DELETION), + Command.class); assertThat(cmd).isInstanceOf(IndexChange.Delete.class); delete = (IndexChange.Delete) cmd; assertThat(delete.getId()).isEqualTo("foo~100"); @@ -83,7 +97,10 @@ @Test public void indexGroup() { - Command cmd = gson.fromJson("{type: 'index-group', uuid: 'foo'}", Command.class); + Command cmd = + gson.fromJson( + String.format("{type: '%s', uuid: 'foo'}", EventType.INDEX_GROUP_UPDATE), + Command.class); assertThat(cmd).isInstanceOf(IndexGroup.class); IndexGroup index = (IndexGroup) cmd; assertThat(index.getUuid()).isEqualTo("foo"); @@ -91,7 +108,10 @@ @Test public void indexProject() { - Command cmd = gson.fromJson("{type: 'index-project', projectName: 'foo'}", Command.class); + Command cmd = + gson.fromJson( + String.format("{type: '%s', projectName: 'foo'}", EventType.INDEX_PROJECT_UPDATE), + Command.class); assertThat(cmd).isInstanceOf(IndexProject.class); IndexProject index = (IndexProject) cmd; assertThat(index.getProjectName()).isEqualTo("foo"); @@ -101,8 +121,10 @@ public void postEvent() { Command cmd = gson.fromJson( - "{event: {projectName : 'foo', headName : 'refs/heads/master', type :" - + " 'project-created', eventCreatedOn:1505898779}, type : 'post-event'}", + String.format( + "{event: {projectName : 'foo', headName : 'refs/heads/master', type :" + + " 'project-created', eventCreatedOn:1505898779}, type : '%s'}", + EventType.EVENT_SENT), Command.class); assertThat(cmd).isInstanceOf(PostEvent.class); Event e = ((PostEvent) cmd).getEvent(); @@ -118,7 +140,7 @@ gson.fromJson( String.format( "{type: '%s', cacheName: '%s', keyJson: '%s'}", - EvictCache.TYPE, Constants.PROJECTS, keyJson), + EventType.CACHE_EVICTION, Constants.PROJECTS, keyJson), EvictCache.class); assertThat(cmd).isInstanceOf(EvictCache.class); EvictCache evict = (EvictCache) cmd; @@ -130,7 +152,10 @@ @Test public void addToProjectList() { - Command cmd = gson.fromJson("{type: 'add-to-project-list', projectName: 'foo'}", Command.class); + Command cmd = + gson.fromJson( + String.format("{type: '%s', projectName: 'foo'}", EventType.PROJECT_LIST_ADDITION), + Command.class); assertThat(cmd).isInstanceOf(AddToProjectList.class); AddToProjectList addToProjectList = (AddToProjectList) cmd; assertThat(addToProjectList.getProjectName()).isEqualTo("foo"); @@ -139,7 +164,9 @@ @Test public void removeFromProjectList() { Command cmd = - gson.fromJson("{type: 'remove-from-project-list', projectName: 'foo'}", Command.class); + gson.fromJson( + String.format("{type: '%s', projectName: 'foo'}", EventType.PROJECT_LIST_DELETION), + Command.class); assertThat(cmd).isInstanceOf(RemoveFromProjectList.class); RemoveFromProjectList removeFromProjectList = (RemoveFromProjectList) cmd; assertThat(removeFromProjectList.getProjectName()).isEqualTo("foo");
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/JGroupsForwarderTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/JGroupsForwarderTest.java index 082b5d1..e7542b4 100644 --- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/JGroupsForwarderTest.java +++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/JGroupsForwarderTest.java
@@ -23,6 +23,10 @@ import static org.mockito.Mockito.when; import com.ericsson.gerrit.plugins.highavailability.Configuration; +import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder.Result; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwarderMetrics; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwarderMetricsRegistry; +import com.ericsson.gerrit.plugins.highavailability.forwarder.IndexEvent; import com.google.gerrit.server.events.EventGsonProvider; import com.google.gerrit.server.git.WorkQueue; import com.google.gson.Gson; @@ -30,7 +34,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledThreadPoolExecutor; import org.jgroups.Address; import org.jgroups.blocks.MessageDispatcher; import org.jgroups.util.Rsp; @@ -38,7 +42,10 @@ import org.jgroups.util.UUID; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +@RunWith(org.mockito.junit.MockitoJUnitRunner.class) public class JGroupsForwarderTest { private static final int MAX_TRIES = 3; @@ -52,6 +59,9 @@ private MessageDispatcher dispatcher; private JGroupsForwarder forwarder; + @Mock ForwarderMetricsRegistry metricsRegistry; + @Mock ForwarderMetrics metrics; + @Before public void setUp() throws Exception { Gson eventGson = new EventGsonProvider().get(); @@ -67,10 +77,17 @@ WorkQueue workQueue = mock(WorkQueue.class); when(workQueue.createQueue(THREAD_POOLS_SIZE, "JGroupsForwarder")) - .thenReturn(Executors.newScheduledThreadPool(THREAD_POOLS_SIZE)); + .thenReturn(new ScheduledThreadPoolExecutor(THREAD_POOLS_SIZE)); + + when(metricsRegistry.get(any())).thenReturn(metrics); + forwarder = new JGroupsForwarder( - dispatcher, cfg, gson, new FailsafeExecutorProvider(cfg, workQueue).get()); + dispatcher, + cfg, + gson, + new FailsafeExecutorProvider(cfg, workQueue).get(), + metricsRegistry); } @Test @@ -78,8 +95,8 @@ RspList<Object> OK = new RspList<>(Map.of(A1, RSP_OK, A2, RSP_OK)); when(dispatcher.castMessage(any(), any(), any())).thenReturn(OK); - CompletableFuture<Boolean> result = forwarder.indexAccount(100, null); - assertThat(result.get()).isTrue(); + CompletableFuture<Result> result = forwarder.indexAccount(100, new IndexEvent()); + assertThat(result.get().result()).isTrue(); verify(dispatcher, times(1)).castMessage(any(), any(), any()); } @@ -90,8 +107,8 @@ RspList<Object> FAIL = new RspList<>(Map.of(A1, RSP_OK, A2, RSP_FAIL)); when(dispatcher.castMessage(any(), any(), any())).thenReturn(FAIL, OK); - CompletableFuture<Boolean> result = forwarder.indexAccount(100, null); - assertThat(result.get()).isTrue(); + CompletableFuture<Result> result = forwarder.indexAccount(100, new IndexEvent()); + assertThat(result.get().result()).isTrue(); verify(dispatcher, times(2)).castMessage(any(), any(), any()); } @@ -102,8 +119,8 @@ // return FAIL x MAX_TRIES when(dispatcher.castMessage(any(), any(), any())).thenReturn(FAIL, FAIL, FAIL); - CompletableFuture<Boolean> result = forwarder.indexAccount(100, null); - assertThat(result.get()).isFalse(); + CompletableFuture<Result> result = forwarder.indexAccount(100, new IndexEvent()); + assertThat(result.get().result()).isFalse(); verify(dispatcher, times(MAX_TRIES)).castMessage(any(), any(), any()); } }
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/MessageProcessorTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/MessageProcessorTest.java index c3114de..91d291c 100644 --- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/MessageProcessorTest.java +++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/jgroups/MessageProcessorTest.java
@@ -15,10 +15,12 @@ package com.ericsson.gerrit.plugins.highavailability.forwarder.jgroups; import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; import com.ericsson.gerrit.plugins.highavailability.forwarder.CacheEntry; import com.ericsson.gerrit.plugins.highavailability.forwarder.CacheNotFoundException; @@ -29,6 +31,8 @@ import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexChangeHandler; import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexingHandler.Operation; import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedProjectListUpdateHandler; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ProcessorMetrics; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ProcessorMetricsRegistry; import com.google.gerrit.entities.Account; import com.google.gerrit.entities.Change; import com.google.gerrit.server.events.Event; @@ -37,14 +41,18 @@ import com.google.gerrit.server.permissions.PermissionBackendException; import com.google.gson.Gson; import java.io.IOException; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Optional; import org.jgroups.ObjectMessage; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +@RunWith(org.mockito.junit.MockitoJUnitRunner.class) public class MessageProcessorTest { private MessageProcessor processor; @@ -57,10 +65,14 @@ private ForwardedEventHandler eventHandler; private ForwardedProjectListUpdateHandler projectListUpdateHandler; + @Mock ProcessorMetrics processorMetrics; + @Mock ProcessorMetricsRegistry metricsRegistry; + private List<Object> allHandlers = new ArrayList<>(); @Before public void setUp() { + when(metricsRegistry.get(any())).thenReturn(processorMetrics); Gson eventGson = new EventGsonProvider().get(); gson = new JGroupsForwarderModule().buildJGroupsGson(eventGson); @@ -79,7 +91,8 @@ indexAccountHandler, cacheEvictionHandler, eventHandler, - projectListUpdateHandler); + projectListUpdateHandler, + metricsRegistry); } private <T> T createHandlerMock(Class<T> handlerClass) { @@ -92,7 +105,7 @@ public void indexAccount() throws IOException { int ACCOUNT_ID = 100; - IndexAccount cmd = new IndexAccount(ACCOUNT_ID); + IndexAccount cmd = new IndexAccount(ACCOUNT_ID, Instant.now()); assertThat(processor.handle(new ObjectMessage(null, gson.toJson(cmd)))).isEqualTo(true); verify(indexAccountHandler, times(1)) .index(Account.id(ACCOUNT_ID), Operation.INDEX, Optional.empty()); @@ -104,7 +117,7 @@ String PROJECT = "foo"; int CHANGE_ID = 100; - IndexChange.Update cmd = new IndexChange.Update(PROJECT, CHANGE_ID); + IndexChange.Update cmd = new IndexChange.Update(PROJECT, CHANGE_ID, Instant.now()); assertThat(processor.handle(new ObjectMessage(null, gson.toJson(cmd)))).isEqualTo(true); verify(indexChangeHandler, times(1)) .index(PROJECT + "~" + Change.id(CHANGE_ID), Operation.INDEX, Optional.empty()); @@ -116,7 +129,7 @@ String PROJECT = "foo"; int CHANGE_ID = 100; - IndexChange.Update cmd = new IndexChange.Update(PROJECT, CHANGE_ID, true); + IndexChange.BatchUpdate cmd = new IndexChange.BatchUpdate(PROJECT, CHANGE_ID, Instant.now()); assertThat(processor.handle(new ObjectMessage(null, gson.toJson(cmd)))).isEqualTo(true); verify(indexBatchChangeHandler, times(1)) .index(PROJECT + "~" + Change.id(CHANGE_ID), Operation.INDEX, Optional.empty()); @@ -128,7 +141,7 @@ String PROJECT = "foo"; int CHANGE_ID = 100; - IndexChange.Delete cmd = new IndexChange.Delete(PROJECT, CHANGE_ID); + IndexChange.Delete cmd = new IndexChange.Delete(PROJECT, CHANGE_ID, Instant.now()); assertThat(processor.handle(new ObjectMessage(null, gson.toJson(cmd)))).isEqualTo(true); verify(indexChangeHandler, times(1)) .index(PROJECT + "~" + Change.id(CHANGE_ID), Operation.DELETE, Optional.empty()); @@ -140,7 +153,7 @@ String CACHE = "foo"; String KEY_JSON = gson.toJson(100); - EvictCache cmd = new EvictCache(CACHE, KEY_JSON); + EvictCache cmd = new EvictCache(CACHE, KEY_JSON, Instant.now()); assertThat(processor.handle(new ObjectMessage(null, gson.toJson(cmd)))).isEqualTo(true); CacheEntry e = CacheEntry.from(CACHE, KEY_JSON); verify(cacheEvictionHandler, times(1)).evict(e); @@ -154,7 +167,7 @@ EventTypes.register(TestEvent.TYPE, TestEvent.class); TestEvent event = new TestEvent(FOO, BAR); - PostEvent cmd = new PostEvent(event); + PostEvent cmd = new PostEvent(event, Instant.now()); assertThat(processor.handle(new ObjectMessage(null, gson.toJson(cmd)))).isEqualTo(true); ArgumentCaptor<Event> captor = ArgumentCaptor.forClass(Event.class); verify(eventHandler, times(1)).dispatch(captor.capture()); @@ -169,7 +182,7 @@ public void addToProjectList() throws IOException { String PROJECT = "foo"; - AddToProjectList cmd = new AddToProjectList(PROJECT); + AddToProjectList cmd = new AddToProjectList(PROJECT, Instant.now()); assertThat(processor.handle(new ObjectMessage(null, gson.toJson(cmd)))).isEqualTo(true); verify(projectListUpdateHandler, times(1)).update(PROJECT, false); verifyOtherHandlersNotUsed(projectListUpdateHandler); @@ -179,7 +192,7 @@ public void removeFromProjectList() throws IOException { String PROJECT = "foo"; - RemoveFromProjectList cmd = new RemoveFromProjectList(PROJECT); + RemoveFromProjectList cmd = new RemoveFromProjectList(PROJECT, Instant.now()); assertThat(processor.handle(new ObjectMessage(null, gson.toJson(cmd)))).isEqualTo(true); verify(projectListUpdateHandler, times(1)).update(PROJECT, true); verifyOtherHandlersNotUsed(projectListUpdateHandler);
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/CacheRestApiServletTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/CacheRestApiServletTest.java index 58c0aef..68d348c 100644 --- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/CacheRestApiServletTest.java +++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/CacheRestApiServletTest.java
@@ -24,6 +24,8 @@ import com.ericsson.gerrit.plugins.highavailability.cache.Constants; import com.ericsson.gerrit.plugins.highavailability.forwarder.CacheNotFoundException; import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedCacheEvictionHandler; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ProcessorMetrics; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ProcessorMetricsRegistry; import com.google.gson.Gson; import java.io.BufferedReader; import java.io.IOException; @@ -41,13 +43,16 @@ @Mock private HttpServletResponse responseMock; @Mock private BufferedReader readerMock; @Mock private ForwardedCacheEvictionHandler forwardedCacheEvictionHandlerMock; + @Mock private ProcessorMetricsRegistry metricsRegistry; + @Mock ProcessorMetrics metrics; private CacheRestApiServlet servlet; @Before public void setUp() { + when(metricsRegistry.get(any())).thenReturn(metrics); servlet = new CacheRestApiServlet( - forwardedCacheEvictionHandlerMock, new CacheKeyJsonParser(new Gson())); + forwardedCacheEvictionHandlerMock, new CacheKeyJsonParser(new Gson()), metricsRegistry); } @Test
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/EventRestApiServletTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/EventRestApiServletTest.java index 7c23e29..9ed764f 100644 --- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/EventRestApiServletTest.java +++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/EventRestApiServletTest.java
@@ -24,6 +24,8 @@ import static org.mockito.Mockito.when; import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedEventHandler; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ProcessorMetrics; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ProcessorMetricsRegistry; import com.google.common.net.MediaType; import com.google.gerrit.entities.Project; import com.google.gerrit.server.events.EventDispatcher; @@ -52,6 +54,8 @@ @Mock private ForwardedEventHandler forwardedEventHandlerMock; @Mock private HttpServletRequest requestMock; @Mock private HttpServletResponse responseMock; + @Mock private ProcessorMetricsRegistry metricsRegistryMock; + @Mock private ProcessorMetrics metrics; private EventRestApiServlet eventRestApiServlet; private Gson gson = new EventGsonProvider().get(); @@ -62,7 +66,9 @@ @Before public void createEventsRestApiServlet() throws Exception { - eventRestApiServlet = new EventRestApiServlet(forwardedEventHandlerMock, gson); + when(metricsRegistryMock.get(any())).thenReturn(metrics); + eventRestApiServlet = + new EventRestApiServlet(forwardedEventHandlerMock, gson, metricsRegistryMock); when(requestMock.getContentType()).thenReturn(MediaType.JSON_UTF_8.toString()); } @@ -93,7 +99,7 @@ .when(dispatcher) .postEvent(any(RefReplicationDoneEvent.class)); ForwardedEventHandler forwardedEventHandler = new ForwardedEventHandler(dispatcher); - eventRestApiServlet = new EventRestApiServlet(forwardedEventHandler, gson); + eventRestApiServlet = new EventRestApiServlet(forwardedEventHandler, gson, metricsRegistryMock); eventRestApiServlet.doPost(requestMock, responseMock); verify(responseMock).setStatus(SC_NO_CONTENT); }
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/ForwardedCacheEvictionHandlerIT.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/ForwardedCacheEvictionHandlerIT.java index 5a0833f..3df6104 100644 --- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/ForwardedCacheEvictionHandlerIT.java +++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/ForwardedCacheEvictionHandlerIT.java
@@ -16,8 +16,6 @@ import static com.google.common.truth.Truth.assertThat; -import com.ericsson.gerrit.plugins.highavailability.forwarder.CacheEntry; -import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedCacheEvictionHandler; import com.google.common.cache.RemovalNotification; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -39,8 +37,6 @@ import org.junit.Before; import org.junit.Test; -@NoHttpd -@UseSsh @TestPlugin( name = "high-availability", sysModule = "com.ericsson.gerrit.plugins.highavailability.Module", @@ -53,8 +49,6 @@ private DynamicSet<CacheRemovalListener> cacheRemovalListeners; @Inject @EventGson private Gson gson; - @Inject private ForwardedCacheEvictionHandler objectUnderTest; - @Inject private CacheKeyJsonParser gsonParser; private CacheEvictionsTracker<?, ?> evictionsCacheTracker; private RegistrationHandle cacheEvictionRegistrationHandle; @@ -106,8 +100,10 @@ @Test public void shouldEvictProjectCache() throws Exception { - Object parsedKey = gsonParser.fromJson(ProjectCacheImpl.CACHE_NAME, gson.toJson(project)); - objectUnderTest.evict(CacheEntry.from(ProjectCacheImpl.CACHE_NAME, parsedKey)); + adminRestSession + .post( + "/plugins/high-availability/cache/" + ProjectCacheImpl.CACHE_NAME, gson.toJson(project)) + .assertNoContent(); evictionsCacheTracker.waitForExpectedEvictions(); assertThat(evictionsCacheTracker.trackedEvictionsFor(ProjectCacheImpl.CACHE_NAME))
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpSessionTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpSessionTest.java index 4d874dd..a0bcf10 100644 --- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpSessionTest.java +++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/HttpSessionTest.java
@@ -31,6 +31,7 @@ import com.google.gson.Gson; import java.net.SocketTimeoutException; import java.time.Duration; +import java.time.Instant; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -80,7 +81,7 @@ wireMockRule.givenThat( post(urlEqualTo(ENDPOINT)).willReturn(aResponse().withStatus(NO_CONTENT))); - assertThat(httpSession.post(uri).isSuccessful()).isTrue(); + assertThat(httpSession.post(uri, Instant.now()).isSuccessful()).isTrue(); } @Test @@ -89,7 +90,7 @@ post(urlEqualTo(ENDPOINT)) .withRequestBody(equalTo(BODY)) .willReturn(aResponse().withStatus(NO_CONTENT))); - assertThat(httpSession.post(uri, BODY).isSuccessful()).isTrue(); + assertThat(httpSession.post(uri, BODY, Instant.now()).isSuccessful()).isTrue(); } @Test @@ -97,7 +98,7 @@ wireMockRule.givenThat( delete(urlEqualTo(ENDPOINT)).willReturn(aResponse().withStatus(NO_CONTENT))); - assertThat(httpSession.delete(uri).isSuccessful()).isTrue(); + assertThat(httpSession.delete(uri, Instant.now()).isSuccessful()).isTrue(); } @Test @@ -107,7 +108,7 @@ post(urlEqualTo(ENDPOINT)) .willReturn(aResponse().withStatus(UNAUTHORIZED).withBody(expected))); - HttpResult result = httpSession.post(uri); + HttpResult result = httpSession.post(uri, Instant.now()); assertThat(result.isSuccessful()).isFalse(); assertThat(result.getMessage()).isEqualTo(expected); } @@ -119,7 +120,7 @@ post(urlEqualTo(ENDPOINT)) .willReturn(aResponse().withStatus(NOT_FOUND).withBody(expected))); - HttpResult result = httpSession.post(uri); + HttpResult result = httpSession.post(uri, Instant.now()); assertThat(result.isSuccessful()).isFalse(); assertThat(result.getMessage()).isEqualTo(expected); } @@ -130,7 +131,7 @@ post(urlEqualTo(ENDPOINT)) .willReturn(aResponse().withStatus(ERROR).withBody(ERROR_MESSAGE))); - HttpResult result = httpSession.post(uri); + HttpResult result = httpSession.post(uri, Instant.now()); assertThat(result.isSuccessful()).isFalse(); assertThat(result.getMessage()).isEqualTo(ERROR_MESSAGE); } @@ -161,7 +162,7 @@ .whenScenarioStateIs(THIRD_TRY) .willReturn(aResponse().withFixedDelay((int) TIMEOUT.toMillis()))); - httpSession.post(uri); + httpSession.post(uri, Instant.now()); } @Test @@ -170,6 +171,6 @@ post(urlEqualTo(ENDPOINT)) .willReturn(aResponse().withFault(Fault.MALFORMED_RESPONSE_CHUNK))); - assertThat(httpSession.post(uri).isSuccessful()).isFalse(); + assertThat(httpSession.post(uri, Instant.now()).isSuccessful()).isFalse(); } }
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexAccountRestApiServletTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexAccountRestApiServletTest.java index 83102a9..49ce5f8 100644 --- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexAccountRestApiServletTest.java +++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexAccountRestApiServletTest.java
@@ -26,7 +26,10 @@ import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexAccountHandler; import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexingHandler.Operation; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ProcessorMetrics; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ProcessorMetricsRegistry; import com.google.gerrit.entities.Account; +import com.google.gson.Gson; import java.io.IOException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -44,13 +47,16 @@ @Mock private ForwardedIndexAccountHandler handlerMock; @Mock private HttpServletRequest requestMock; @Mock private HttpServletResponse responseMock; + @Mock private ProcessorMetricsRegistry metricsRegistryMock; + @Mock private ProcessorMetrics metrics; private Account.Id id; private IndexAccountRestApiServlet servlet; @Before public void setUpMocks() { - servlet = new IndexAccountRestApiServlet(handlerMock); + when(metricsRegistryMock.get(any())).thenReturn(metrics); + servlet = new IndexAccountRestApiServlet(handlerMock, new Gson(), metricsRegistryMock); id = Account.id(ACCOUNT_NUMBER); when(requestMock.getRequestURI()) .thenReturn("http://gerrit.com/index/account/" + ACCOUNT_NUMBER);
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexChangeRestApiServletTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexChangeRestApiServletTest.java index 3da35d4..a19fda6 100644 --- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexChangeRestApiServletTest.java +++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexChangeRestApiServletTest.java
@@ -25,6 +25,8 @@ import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexChangeHandler; import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexingHandler.Operation; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ProcessorMetrics; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ProcessorMetricsRegistry; import com.google.gson.Gson; import java.io.IOException; import javax.servlet.http.HttpServletRequest; @@ -46,12 +48,15 @@ @Mock private ForwardedIndexChangeHandler handlerMock; @Mock private HttpServletRequest requestMock; @Mock private HttpServletResponse responseMock; + @Mock private ProcessorMetricsRegistry metricsRegistryMock; + @Mock private ProcessorMetrics metrics; private IndexChangeRestApiServlet servlet; @Before public void setUpMocks() { - servlet = new IndexChangeRestApiServlet(handlerMock, new Gson()); + when(metricsRegistryMock.get(any())).thenReturn(metrics); + servlet = new IndexChangeRestApiServlet(handlerMock, new Gson(), metricsRegistryMock); when(requestMock.getRequestURI()) .thenReturn("http://gerrit.com/index/change/" + PROJECT_NAME_URL_ENC + "~" + CHANGE_NUMBER); }
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexGroupRestApiServletTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexGroupRestApiServletTest.java index fb5788d..0e87bf7 100644 --- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexGroupRestApiServletTest.java +++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexGroupRestApiServletTest.java
@@ -26,7 +26,10 @@ import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexGroupHandler; import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexingHandler.Operation; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ProcessorMetrics; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ProcessorMetricsRegistry; import com.google.gerrit.entities.AccountGroup; +import com.google.gson.Gson; import java.io.IOException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -44,13 +47,16 @@ @Mock private ForwardedIndexGroupHandler handlerMock; @Mock private HttpServletRequest requestMock; @Mock private HttpServletResponse responseMock; + @Mock private ProcessorMetricsRegistry metricsRegistryMock; + @Mock private ProcessorMetrics metrics; private AccountGroup.UUID uuid; private IndexGroupRestApiServlet servlet; @Before public void setUpMocks() { - servlet = new IndexGroupRestApiServlet(handlerMock); + when(metricsRegistryMock.get(any())).thenReturn(metrics); + servlet = new IndexGroupRestApiServlet(handlerMock, new Gson(), metricsRegistryMock); uuid = AccountGroup.uuid(UUID); when(requestMock.getRequestURI()).thenReturn("http://gerrit.com/index/group/" + UUID); }
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexProjectRestApiServletTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexProjectRestApiServletTest.java index da6e70d..d74a43f 100644 --- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexProjectRestApiServletTest.java +++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/IndexProjectRestApiServletTest.java
@@ -26,8 +26,11 @@ import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexProjectHandler; import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedIndexingHandler.Operation; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ProcessorMetrics; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ProcessorMetricsRegistry; import com.google.gerrit.entities.Project; import com.google.gerrit.extensions.restapi.Url; +import com.google.gson.Gson; import java.io.IOException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -45,13 +48,16 @@ @Mock private ForwardedIndexProjectHandler handlerMock; @Mock private HttpServletRequest requestMock; @Mock private HttpServletResponse responseMock; + @Mock private ProcessorMetricsRegistry metricsRegistryMock; + @Mock private ProcessorMetrics metrics; private Project.NameKey nameKey; private IndexProjectRestApiServlet servlet; @Before public void setUpMocks() { - servlet = new IndexProjectRestApiServlet(handlerMock); + when(metricsRegistryMock.get(any())).thenReturn(metrics); + servlet = new IndexProjectRestApiServlet(handlerMock, new Gson(), metricsRegistryMock); nameKey = Project.nameKey(PROJECT_NAME); when(requestMock.getRequestURI()) .thenReturn("http://gerrit.com/index/project/" + Url.encode(nameKey.get()));
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/ProjectListRestApiServletTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/ProjectListRestApiServletTest.java index 38ab488..fd0813e 100644 --- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/ProjectListRestApiServletTest.java +++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/ProjectListRestApiServletTest.java
@@ -15,11 +15,14 @@ package com.ericsson.gerrit.plugins.highavailability.forwarder.rest; import static javax.servlet.http.HttpServletResponse.SC_NO_CONTENT; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwardedProjectListUpdateHandler; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ProcessorMetrics; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ProcessorMetricsRegistry; import com.google.gerrit.extensions.restapi.Url; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -36,12 +39,15 @@ @Mock private ForwardedProjectListUpdateHandler handlerMock; @Mock private HttpServletRequest requestMock; @Mock private HttpServletResponse responseMock; + @Mock private ProcessorMetricsRegistry metricsRegistryMock; + @Mock private ProcessorMetrics metrics; private ProjectListApiServlet servlet; @Before public void setUpMocks() { - servlet = new ProjectListApiServlet(handlerMock); + when(metricsRegistryMock.get(any())).thenReturn(metrics); + servlet = new ProjectListApiServlet(handlerMock, metricsRegistryMock); when(requestMock.getRequestURI()) .thenReturn( "http://hostname/plugins/high-availability/cache/project_list/"
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java index 4b98fe1..78d21ef 100644 --- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java +++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/forwarder/rest/RestForwarderTest.java
@@ -19,12 +19,13 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import com.ericsson.gerrit.plugins.highavailability.Configuration; import com.ericsson.gerrit.plugins.highavailability.cache.Constants; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwarderMetrics; +import com.ericsson.gerrit.plugins.highavailability.forwarder.ForwarderMetricsRegistry; import com.ericsson.gerrit.plugins.highavailability.forwarder.IndexEvent; import com.ericsson.gerrit.plugins.highavailability.forwarder.TestEvent; import com.ericsson.gerrit.plugins.highavailability.forwarder.rest.HttpResponseHandler.HttpResult; @@ -35,19 +36,20 @@ import com.google.gerrit.entities.AccountGroup; import com.google.gerrit.entities.Project; import com.google.gerrit.server.events.Event; -import com.google.gerrit.server.git.WorkQueue; import com.google.gson.Gson; import com.google.inject.Provider; import java.io.IOException; import java.time.Duration; import java.util.Set; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLException; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; import org.mockito.Answers; +import org.mockito.Mock; +@RunWith(org.mockito.junit.MockitoJUnitRunner.class) public class RestForwarderTest { private static final String URL = "http://fake.com"; private static final String PLUGIN_NAME = "high-availability"; @@ -111,6 +113,9 @@ private Configuration configMock; Provider<Set<PeerInfo>> peersMock; + @Mock ForwarderMetricsRegistry metricsRegistry; + @Mock ForwarderMetrics metrics; + @SuppressWarnings("unchecked") @Before public void setUp() { @@ -121,9 +126,7 @@ when(configMock.http().threadPoolSize()).thenReturn(2); peersMock = mock(Provider.class); when(peersMock.get()).thenReturn(ImmutableSet.of(new PeerInfo(URL))); - WorkQueue workQueue = mock(WorkQueue.class); - when(workQueue.createQueue(configMock.http().threadPoolSize(), "RestForwarderScheduler")) - .thenReturn(Executors.newScheduledThreadPool(2)); + when(metricsRegistry.get(any())).thenReturn(metrics); forwarder = new RestForwarder( httpSessionMock, @@ -131,188 +134,228 @@ configMock, peersMock, gson, // TODO: Create provider - new FailsafeExecutorProvider(configMock).get()); + new FailsafeExecutorProvider(configMock).get(), + metricsRegistry); } @Test public void testIndexAccountOK() throws Exception { - when(httpSessionMock.post(eq(INDEX_ACCOUNT_ENDPOINT), any())) + when(httpSessionMock.post(eq(INDEX_ACCOUNT_ENDPOINT), any(), any())) .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG)); assertThat( forwarder .indexAccount(ACCOUNT_NUMBER, new IndexEvent()) - .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS) + .result()) .isTrue(); } @Test public void testIndexAccountFailed() throws Exception { - when(httpSessionMock.post(eq(INDEX_ACCOUNT_ENDPOINT), any())) + when(httpSessionMock.post(eq(INDEX_ACCOUNT_ENDPOINT), any(), any())) .thenReturn(new HttpResult(FAILED, EMPTY_MSG)); assertThat( forwarder .indexAccount(ACCOUNT_NUMBER, new IndexEvent()) - .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS) + .result()) .isFalse(); } @Test public void testIndexAccountThrowsException() throws Exception { - doThrow(new IOException()).when(httpSessionMock).post(eq(INDEX_ACCOUNT_ENDPOINT), any()); + when(httpSessionMock.post(eq(INDEX_ACCOUNT_ENDPOINT), any(), any())) + .thenThrow(IOException.class); assertThat( forwarder .indexAccount(ACCOUNT_NUMBER, new IndexEvent()) - .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS) + .result()) .isFalse(); } @Test public void testIndexGroupOK() throws Exception { - when(httpSessionMock.post(eq(INDEX_GROUP_ENDPOINT), any())) + when(httpSessionMock.post(eq(INDEX_GROUP_ENDPOINT), any(), any())) .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG)); - assertThat(forwarder.indexGroup(UUID, new IndexEvent()).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + assertThat( + forwarder + .indexGroup(UUID, new IndexEvent()) + .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS) + .result()) .isTrue(); } @Test public void testIndexGroupFailed() throws Exception { - when(httpSessionMock.post(eq(INDEX_GROUP_ENDPOINT), any())) + when(httpSessionMock.post(eq(INDEX_GROUP_ENDPOINT), any(), any())) .thenReturn(new HttpResult(FAILED, EMPTY_MSG)); - assertThat(forwarder.indexGroup(UUID, new IndexEvent()).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + assertThat( + forwarder + .indexGroup(UUID, new IndexEvent()) + .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS) + .result()) .isFalse(); } @Test public void testIndexGroupThrowsException() throws Exception { - doThrow(new IOException()).when(httpSessionMock).post(eq(INDEX_GROUP_ENDPOINT), any()); - assertThat(forwarder.indexGroup(UUID, new IndexEvent()).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + when(httpSessionMock.post(eq(INDEX_GROUP_ENDPOINT), any(), any())).thenThrow(IOException.class); + assertThat( + forwarder + .indexGroup(UUID, new IndexEvent()) + .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS) + .result()) .isFalse(); } @Test public void testIndexChangeOK() throws Exception { - when(httpSessionMock.post(eq(INDEX_CHANGE_ENDPOINT), any())) + when(httpSessionMock.post(eq(INDEX_CHANGE_ENDPOINT), any(), any())) .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG)); assertThat( forwarder .indexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent()) - .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS) + .result()) .isTrue(); } @Test public void testIndexChangeFailed() throws Exception { - when(httpSessionMock.post(eq(INDEX_CHANGE_ENDPOINT), any())) + when(httpSessionMock.post(eq(INDEX_CHANGE_ENDPOINT), any(), any())) .thenReturn(new HttpResult(FAILED, EMPTY_MSG)); assertThat( forwarder .indexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent()) - .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS) + .result()) .isFalse(); } @Test public void testIndexChangeThrowsException() throws Exception { - doThrow(new IOException()).when(httpSessionMock).post(eq(INDEX_CHANGE_ENDPOINT), any()); + when(httpSessionMock.post(eq(INDEX_CHANGE_ENDPOINT), any(), any())) + .thenThrow(IOException.class); assertThat( forwarder .indexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent()) - .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS) + .result()) .isFalse(); } @Test public void testIndexBatchChangeOK() throws Exception { - when(httpSessionMock.post(eq(INDEX_BATCH_CHANGE_ENDPOINT), any())) + when(httpSessionMock.post(eq(INDEX_BATCH_CHANGE_ENDPOINT), any(), any())) .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG)); - assertThat(forwarder.batchIndexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent()).get()) + assertThat( + forwarder + .batchIndexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent()) + .get() + .result()) .isTrue(); } @Test public void testIndexBatchChangeFailed() throws Exception { - when(httpSessionMock.post(eq(INDEX_BATCH_CHANGE_ENDPOINT), any())) + when(httpSessionMock.post(eq(INDEX_BATCH_CHANGE_ENDPOINT), any(), any())) .thenReturn(new HttpResult(FAILED, EMPTY_MSG)); - assertThat(forwarder.batchIndexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent()).get()) + assertThat( + forwarder + .batchIndexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent()) + .get() + .result()) .isFalse(); } @Test public void testIndexBatchChangeThrowsException() throws Exception { - doThrow(new IOException()).when(httpSessionMock).post(eq(INDEX_BATCH_CHANGE_ENDPOINT), any()); - assertThat(forwarder.batchIndexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent()).get()) + when(httpSessionMock.post(eq(INDEX_BATCH_CHANGE_ENDPOINT), any(), any())) + .thenThrow(IOException.class); + assertThat( + forwarder + .batchIndexChange(PROJECT_NAME, CHANGE_NUMBER, new IndexEvent()) + .get() + .result()) .isFalse(); } @Test public void testChangeDeletedFromIndexOK() throws Exception { - when(httpSessionMock.delete(eq(DELETE_CHANGE_ENDPOINT))) + when(httpSessionMock.delete(eq(DELETE_CHANGE_ENDPOINT), any())) .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG)); assertThat( forwarder .deleteChangeFromIndex(CHANGE_NUMBER, new IndexEvent()) - .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS) + .result()) .isTrue(); } @Test public void testAllChangesDeletedFromIndexOK() throws Exception { - when(httpSessionMock.delete(eq(DELETE_ALL_CHANGES_ENDPOINT))) + when(httpSessionMock.delete(eq(DELETE_ALL_CHANGES_ENDPOINT), any())) .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG)); assertThat( forwarder .deleteAllChangesForProject(Project.nameKey(PROJECT_NAME)) - .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS) + .result()) .isTrue(); } @Test public void testChangeDeletedFromIndexFailed() throws Exception { - when(httpSessionMock.delete(eq(DELETE_CHANGE_ENDPOINT))) + when(httpSessionMock.delete(eq(DELETE_CHANGE_ENDPOINT), any())) .thenReturn(new HttpResult(FAILED, EMPTY_MSG)); assertThat( forwarder .deleteChangeFromIndex(CHANGE_NUMBER, new IndexEvent()) - .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS) + .result()) .isFalse(); } @Test public void testChangeDeletedFromThrowsException() throws Exception { - doThrow(new IOException()).when(httpSessionMock).delete(eq(DELETE_CHANGE_ENDPOINT)); + when(httpSessionMock.delete(eq(DELETE_CHANGE_ENDPOINT), any())).thenThrow(IOException.class); assertThat( forwarder .deleteChangeFromIndex(CHANGE_NUMBER, new IndexEvent()) - .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS) + .result()) .isFalse(); } @Test public void testEventSentOK() throws Exception { - when(httpSessionMock.post(EVENT_ENDPOINT, event)) + when(httpSessionMock.post(eq(EVENT_ENDPOINT), eq(event), any())) .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG)); - assertThat(forwarder.send(event).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)).isTrue(); + assertThat(forwarder.send(event).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS).result()).isTrue(); } @Test public void testEventSentFailed() throws Exception { - when(httpSessionMock.post(EVENT_ENDPOINT, event)).thenReturn(new HttpResult(FAILED, EMPTY_MSG)); - assertThat(forwarder.send(event).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)).isFalse(); + when(httpSessionMock.post(eq(EVENT_ENDPOINT), eq(event), any())) + .thenReturn(new HttpResult(FAILED, EMPTY_MSG)); + assertThat(forwarder.send(event).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS).result()).isFalse(); } @Test public void testEventSentThrowsException() throws Exception { - doThrow(new IOException()).when(httpSessionMock).post(EVENT_ENDPOINT, event); - assertThat(forwarder.send(event).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)).isFalse(); + when(httpSessionMock.post(eq(EVENT_ENDPOINT), eq(event), any())).thenThrow(IOException.class); + assertThat(forwarder.send(event).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS).result()).isFalse(); } @Test public void testEvictProjectOK() throws Exception { String key = PROJECT_NAME; String keyJson = gson.toJson(key); - when(httpSessionMock.post(buildCacheEndpoint(Constants.PROJECTS), keyJson)) + when(httpSessionMock.post(eq(buildCacheEndpoint(Constants.PROJECTS)), eq(keyJson), any())) .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG)); - assertThat(forwarder.evict(Constants.PROJECTS, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + assertThat( + forwarder.evict(Constants.PROJECTS, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS).result()) .isTrue(); } @@ -320,9 +363,10 @@ public void testEvictAccountsOK() throws Exception { Account.Id key = Account.id(123); String keyJson = gson.toJson(key); - when(httpSessionMock.post(buildCacheEndpoint(Constants.ACCOUNTS), keyJson)) + when(httpSessionMock.post(eq(buildCacheEndpoint(Constants.ACCOUNTS)), eq(keyJson), any())) .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG)); - assertThat(forwarder.evict(Constants.ACCOUNTS, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + assertThat( + forwarder.evict(Constants.ACCOUNTS, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS).result()) .isTrue(); } @@ -331,8 +375,10 @@ AccountGroup.Id key = AccountGroup.id(123); String keyJson = gson.toJson(key); String endpoint = buildCacheEndpoint(Constants.GROUPS); - when(httpSessionMock.post(endpoint, keyJson)).thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG)); - assertThat(forwarder.evict(Constants.GROUPS, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + when(httpSessionMock.post(eq(endpoint), eq(keyJson), any())) + .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG)); + assertThat( + forwarder.evict(Constants.GROUPS, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS).result()) .isTrue(); } @@ -340,10 +386,14 @@ public void testEvictGroupsByIncludeOK() throws Exception { AccountGroup.UUID key = AccountGroup.uuid("90b3042d9094a37985f3f9281391dbbe9a5addad"); String keyJson = gson.toJson(key); - when(httpSessionMock.post(buildCacheEndpoint(Constants.GROUPS_BYINCLUDE), keyJson)) + when(httpSessionMock.post( + eq(buildCacheEndpoint(Constants.GROUPS_BYINCLUDE)), eq(keyJson), any())) .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG)); assertThat( - forwarder.evict(Constants.GROUPS_BYINCLUDE, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + forwarder + .evict(Constants.GROUPS_BYINCLUDE, key) + .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS) + .result()) .isTrue(); } @@ -351,9 +401,13 @@ public void testEvictGroupsMembersOK() throws Exception { AccountGroup.UUID key = AccountGroup.uuid("90b3042d9094a37985f3f9281391dbbe9a5addad"); String keyJson = gson.toJson(key); - when(httpSessionMock.post(buildCacheEndpoint(Constants.GROUPS_MEMBERS), keyJson)) + when(httpSessionMock.post(eq(buildCacheEndpoint(Constants.GROUPS_MEMBERS)), eq(keyJson), any())) .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG)); - assertThat(forwarder.evict(Constants.GROUPS_MEMBERS, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + assertThat( + forwarder + .evict(Constants.GROUPS_MEMBERS, key) + .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS) + .result()) .isTrue(); } @@ -361,9 +415,10 @@ public void testEvictCacheFailed() throws Exception { String key = PROJECT_NAME; String keyJson = gson.toJson(key); - when(httpSessionMock.post(buildCacheEndpoint(Constants.PROJECTS), keyJson)) + when(httpSessionMock.post(eq(buildCacheEndpoint(Constants.PROJECTS)), eq(keyJson), any())) .thenReturn(new HttpResult(FAILED, EMPTY_MSG)); - assertThat(forwarder.evict(Constants.PROJECTS, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + assertThat( + forwarder.evict(Constants.PROJECTS, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS).result()) .isFalse(); } @@ -371,10 +426,10 @@ public void testEvictCacheThrowsException() throws Exception { String key = PROJECT_NAME; String keyJson = gson.toJson(key); - doThrow(new IOException()) - .when(httpSessionMock) - .post(buildCacheEndpoint(Constants.PROJECTS), keyJson); - assertThat(forwarder.evict(Constants.PROJECTS, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + when(httpSessionMock.post(eq(buildCacheEndpoint(Constants.PROJECTS)), eq(keyJson), any())) + .thenThrow(IOException.class); + assertThat( + forwarder.evict(Constants.PROJECTS, key).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS).result()) .isFalse(); } @@ -385,56 +440,69 @@ @Test public void testAddToProjectListOK() throws Exception { String projectName = PROJECT_TO_ADD; - when(httpSessionMock.post(buildProjectListCacheEndpoint(projectName), null)) + when(httpSessionMock.post(eq(buildProjectListCacheEndpoint(projectName)), any(), any())) .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG)); - assertThat(forwarder.addToProjectList(projectName).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + assertThat( + forwarder.addToProjectList(projectName).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS).result()) .isTrue(); } @Test public void testAddToProjectListFailed() throws Exception { String projectName = PROJECT_TO_ADD; - when(httpSessionMock.post(buildProjectListCacheEndpoint(projectName), null)) + when(httpSessionMock.post(eq(buildProjectListCacheEndpoint(projectName)), any(), any())) .thenReturn(new HttpResult(FAILED, EMPTY_MSG)); - assertThat(forwarder.addToProjectList(projectName).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + assertThat( + forwarder.addToProjectList(projectName).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS).result()) .isFalse(); } @Test public void testAddToProjectListThrowsException() throws Exception { String projectName = PROJECT_TO_ADD; - doThrow(new IOException()) - .when(httpSessionMock) - .post(buildProjectListCacheEndpoint(projectName), null); - assertThat(forwarder.addToProjectList(projectName).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + when(httpSessionMock.post(eq(buildProjectListCacheEndpoint(projectName)), any(), any())) + .thenThrow(IOException.class); + assertThat( + forwarder.addToProjectList(projectName).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS).result()) .isFalse(); } @Test public void testRemoveFromProjectListOK() throws Exception { String projectName = PROJECT_TO_DELETE; - when(httpSessionMock.delete(buildProjectListCacheEndpoint(projectName))) + when(httpSessionMock.delete(eq(buildProjectListCacheEndpoint(projectName)), any())) .thenReturn(new HttpResult(SUCCESSFUL, EMPTY_MSG)); - assertThat(forwarder.removeFromProjectList(projectName).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + assertThat( + forwarder + .removeFromProjectList(projectName) + .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS) + .result()) .isTrue(); } @Test public void testRemoveToProjectListFailed() throws Exception { String projectName = PROJECT_TO_DELETE; - when(httpSessionMock.delete(buildProjectListCacheEndpoint(projectName))) + when(httpSessionMock.delete(eq(buildProjectListCacheEndpoint(projectName)), any())) .thenReturn(new HttpResult(FAILED, EMPTY_MSG)); - assertThat(forwarder.removeFromProjectList(projectName).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + assertThat( + forwarder + .removeFromProjectList(projectName) + .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS) + .result()) .isFalse(); } @Test public void testRemoveToProjectListThrowsException() throws Exception { String projectName = PROJECT_TO_DELETE; - doThrow(new IOException()) - .when(httpSessionMock) - .delete((buildProjectListCacheEndpoint(projectName))); - assertThat(forwarder.removeFromProjectList(projectName).get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + when(httpSessionMock.delete(eq(buildProjectListCacheEndpoint(projectName)), any())) + .thenThrow(IOException.class); + assertThat( + forwarder + .removeFromProjectList(projectName) + .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS) + .result()) .isFalse(); } @@ -444,7 +512,7 @@ @Test public void testRetryOnErrorThenSuccess() throws Exception { - when(httpSessionMock.post(anyString(), anyString())) + when(httpSessionMock.post(anyString(), anyString(), any())) .thenReturn(new HttpResult(false, ERROR)) .thenReturn(new HttpResult(false, ERROR)) .thenReturn(new HttpResult(true, SUCCESS)); @@ -452,13 +520,14 @@ assertThat( forwarder .evict(Constants.PROJECT_LIST, new Object()) - .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS) + .result()) .isTrue(); } @Test public void testRetryOnIoExceptionThenSuccess() throws Exception { - when(httpSessionMock.post(anyString(), anyString())) + when(httpSessionMock.post(anyString(), anyString(), any())) .thenThrow(new IOException()) .thenThrow(new IOException()) .thenReturn(new HttpResult(true, SUCCESS)); @@ -466,26 +535,28 @@ assertThat( forwarder .evict(Constants.PROJECT_LIST, new Object()) - .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS) + .result()) .isTrue(); } @Test public void testNoRetryAfterNonRecoverableException() throws Exception { - when(httpSessionMock.post(anyString(), anyString())) + when(httpSessionMock.post(anyString(), anyString(), any())) .thenThrow(new SSLException("Non Recoverable")) .thenReturn(new HttpResult(true, SUCCESS)); assertThat( forwarder .evict(Constants.PROJECT_LIST, new Object()) - .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS) + .result()) .isFalse(); } @Test public void testFailureAfterMaxTries() throws Exception { - when(httpSessionMock.post(anyString(), anyString())) + when(httpSessionMock.post(anyString(), anyString(), any())) .thenReturn(new HttpResult(false, ERROR)) .thenReturn(new HttpResult(false, ERROR)) .thenReturn(new HttpResult(false, ERROR)); @@ -493,7 +564,8 @@ assertThat( forwarder .evict(Constants.PROJECT_LIST, new Object()) - .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS)) + .get(TEST_TIMEOUT, TEST_TIMEOUT_UNITS) + .result()) .isFalse(); } }
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeCheckerImplTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeCheckerImplTest.java index dfea1df..460d56b 100644 --- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeCheckerImplTest.java +++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/ChangeCheckerImplTest.java
@@ -24,7 +24,6 @@ import com.google.gerrit.server.notedb.ChangeNotes; import com.google.gerrit.server.util.OneOffRequestContext; import java.io.IOException; -import java.sql.Timestamp; import java.time.Instant; import java.util.Optional; import org.junit.Before; @@ -45,7 +44,7 @@ private final Instant testLastUpdatedOn = Instant.now(); private final String changeId = "1"; Optional<IndexEvent> event = Optional.empty(); - private Optional<Long> computedChangeTs = Optional.empty(); + private Optional<Instant> computedChangeTs = Optional.empty(); private ChangeCheckerImpl changeChecker; @Before @@ -61,8 +60,7 @@ @Test public void testGetComputedChangeTs() { - long testTime = Timestamp.from(testLastUpdatedOn).getTime(); - computedChangeTs = Optional.of(testTime / 1000); + computedChangeTs = Optional.of(testLastUpdatedOn); when(changeChecker.getChangeNotes()).thenReturn(Optional.of(testChangeNotes)); when(testChangeNotes.getChange()).thenReturn(testChange); when(testChange.getLastUpdatedOn()).thenReturn(testLastUpdatedOn);
diff --git a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java index a8aa4a9..a986b90 100644 --- a/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java +++ b/src/test/java/com/ericsson/gerrit/plugins/highavailability/index/IndexEventHandlerTest.java
@@ -24,7 +24,9 @@ import com.ericsson.gerrit.plugins.highavailability.Configuration; import com.ericsson.gerrit.plugins.highavailability.forwarder.Context; +import com.ericsson.gerrit.plugins.highavailability.forwarder.EventType; import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder; +import com.ericsson.gerrit.plugins.highavailability.forwarder.Forwarder.Result; import com.ericsson.gerrit.plugins.highavailability.forwarder.IndexEvent; import com.google.gerrit.entities.Account; import com.google.gerrit.entities.AccountGroup; @@ -75,12 +77,17 @@ when(changeCheckerMock.newIndexEvent()).thenReturn(Optional.of(new IndexEvent())); when(forwarder.indexAccount(eq(ACCOUNT_ID), any())) - .thenReturn(CompletableFuture.completedFuture(true)); + .thenReturn( + CompletableFuture.completedFuture(new Result(EventType.INDEX_ACCOUNT_UPDATE, true))); when(forwarder.deleteChangeFromIndex(eq(CHANGE_ID), any())) - .thenReturn(CompletableFuture.completedFuture(true)); - when(forwarder.indexGroup(eq(UUID), any())).thenReturn(CompletableFuture.completedFuture(true)); + .thenReturn( + CompletableFuture.completedFuture(new Result(EventType.INDEX_CHANGE_DELETION, true))); + when(forwarder.indexGroup(eq(UUID), any())) + .thenReturn( + CompletableFuture.completedFuture(new Result(EventType.INDEX_GROUP_UPDATE, true))); when(forwarder.indexChange(eq(PROJECT_NAME), eq(CHANGE_ID), any())) - .thenReturn(CompletableFuture.completedFuture(true)); + .thenReturn( + CompletableFuture.completedFuture(new Result(EventType.INDEX_CHANGE_UPDATE, true))); setUpIndexEventHandler(currCtx); }