Merge branch 'stable-3.4' * stable-3.4: Send/receive Event object instead of EventMessage Use EventGsonProvider binding from Gerrit core Use event deserialization logic from events broker Implement async send method as per 3.4.0-rc2 API Deserialize Event and EventMessage Use EventGsonProvider from Gerrit core Fix properties in EventConsumerIT tests Change-Id: Ia534d42980f0949b3954e0eca23b8a74cc3be3ea
diff --git a/external_plugin_deps.bzl b/external_plugin_deps.bzl index e059b34..e444876 100644 --- a/external_plugin_deps.bzl +++ b/external_plugin_deps.bzl
@@ -15,6 +15,6 @@ maven_jar( name = "events-broker", - artifact = "com.gerritforge:events-broker:3.4.0-rc0", - sha1 = "8c34c88103d4783eb4c4decde6d93541bc1cf064", + artifact = "com.gerritforge:events-broker:3.4.0.4", + sha1 = "8d361d863382290e33828116e65698190118d0f1", )
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java index 1be52cb..2768dd3 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java +++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/Module.java
@@ -14,14 +14,11 @@ package com.googlesource.gerrit.plugins.kafka; -import com.gerritforge.gerrit.eventbroker.EventGsonProvider; import com.google.gerrit.extensions.events.LifecycleListener; import com.google.gerrit.extensions.registration.DynamicSet; import com.google.gerrit.server.events.EventListener; -import com.google.gson.Gson; import com.google.inject.AbstractModule; import com.google.inject.Inject; -import com.google.inject.Singleton; import com.google.inject.TypeLiteral; import com.googlesource.gerrit.plugins.kafka.api.KafkaApiModule; import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher; @@ -39,7 +36,6 @@ @Override protected void configure() { - bind(Gson.class).toProvider(EventGsonProvider.class).in(Singleton.class); DynamicSet.bind(binder(), LifecycleListener.class).to(Manager.class); DynamicSet.bind(binder(), EventListener.class).to(KafkaPublisher.class);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java index 73c7509..a128af8 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java +++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaApiModule.java
@@ -15,11 +15,11 @@ package com.googlesource.gerrit.plugins.kafka.api; import com.gerritforge.gerrit.eventbroker.BrokerApi; -import com.gerritforge.gerrit.eventbroker.EventMessage; import com.gerritforge.gerrit.eventbroker.TopicSubscriber; import com.google.common.collect.Sets; import com.google.gerrit.extensions.registration.DynamicItem; import com.google.gerrit.lifecycle.LifecycleModule; +import com.google.gerrit.server.events.Event; import com.google.gerrit.server.git.WorkQueue; import com.google.inject.Inject; import com.google.inject.Scopes; @@ -61,7 +61,7 @@ workQueue.createQueue(configuration.getNumberOfSubscribers(), "kafka-subscriber")); bind(new TypeLiteral<Deserializer<byte[]>>() {}).toInstance(new ByteArrayDeserializer()); - bind(new TypeLiteral<Deserializer<EventMessage>>() {}).to(KafkaEventDeserializer.class); + bind(new TypeLiteral<Deserializer<Event>>() {}).to(KafkaEventDeserializer.class); bind(new TypeLiteral<Set<TopicSubscriber>>() {}).toInstance(activeConsumers); DynamicItem.bind(binder(), BrokerApi.class).to(KafkaBrokerApi.class).in(Scopes.SINGLETON);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java index 9a7c66a..3ec21e0 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java +++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApi.java
@@ -15,8 +15,9 @@ package com.googlesource.gerrit.plugins.kafka.api; import com.gerritforge.gerrit.eventbroker.BrokerApi; -import com.gerritforge.gerrit.eventbroker.EventMessage; import com.gerritforge.gerrit.eventbroker.TopicSubscriber; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.gerrit.server.events.Event; import com.google.inject.Inject; import com.google.inject.Provider; import com.googlesource.gerrit.plugins.kafka.publish.KafkaPublisher; @@ -42,12 +43,12 @@ } @Override - public boolean send(String topic, EventMessage event) { + public ListenableFuture<Boolean> send(String topic, Event event) { return publisher.publish(topic, event); } @Override - public void receiveAsync(String topic, Consumer<EventMessage> eventConsumer) { + public void receiveAsync(String topic, Consumer<Event> eventConsumer) { KafkaEventSubscriber subscriber = subscriberProvider.get(); synchronized (subscribers) { subscribers.add(subscriber);
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/GsonProvider.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/GsonProvider.java deleted file mode 100644 index 2c5c1e7..0000000 --- a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/GsonProvider.java +++ /dev/null
@@ -1,29 +0,0 @@ -// Copyright (C) 2016 The Android Open Source Project -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.googlesource.gerrit.plugins.kafka.publish; - -import com.google.common.base.Supplier; -import com.google.gerrit.server.events.SupplierSerializer; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.inject.Provider; - -public class GsonProvider implements Provider<Gson> { - - @Override - public Gson get() { - return new GsonBuilder().registerTypeAdapter(Supplier.class, new SupplierSerializer()).create(); - } -}
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java index cc271b5..e7670cb 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java +++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/publish/KafkaPublisher.java
@@ -14,9 +14,10 @@ package com.googlesource.gerrit.plugins.kafka.publish; -import com.gerritforge.gerrit.eventbroker.EventMessage; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ListenableFuture; import com.google.gerrit.server.events.Event; +import com.google.gerrit.server.events.EventGson; import com.google.gerrit.server.events.EventListener; import com.google.gson.Gson; import com.google.gson.JsonObject; @@ -31,7 +32,7 @@ private final Gson gson; @Inject - public KafkaPublisher(KafkaSession kafkaSession, Gson gson) { + public KafkaPublisher(KafkaSession kafkaSession, @EventGson Gson gson) { this.session = kafkaSession; this.gson = gson; } @@ -53,11 +54,11 @@ } } - public boolean publish(String topic, EventMessage event) { + public ListenableFuture<Boolean> publish(String topic, Event event) { return session.publish(topic, getPayload(event)); } - private String getPayload(EventMessage event) { + private String getPayload(Event event) { return gson.toJson(event); }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java index bb79cb5..0dc29e1 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java +++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/session/KafkaSession.java
@@ -14,10 +14,16 @@ package com.googlesource.gerrit.plugins.kafka.session; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.JdkFutureAdapters; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; import com.google.inject.Inject; import com.google.inject.Provider; import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties; import com.googlesource.gerrit.plugins.kafka.publish.KafkaEventsPublisherMetrics; +import java.util.Objects; import java.util.concurrent.Future; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; @@ -80,34 +86,35 @@ producer = null; } - public void publish(String messageBody) { - publish(properties.getTopic(), messageBody); + public ListenableFuture<Boolean> publish(String messageBody) { + return publish(properties.getTopic(), messageBody); } - public boolean publish(String topic, String messageBody) { + public ListenableFuture<Boolean> publish(String topic, String messageBody) { if (properties.isSendAsync()) { return publishAsync(topic, messageBody); } return publishSync(topic, messageBody); } - private boolean publishSync(String topic, String messageBody) { - + private ListenableFuture<Boolean> publishSync(String topic, String messageBody) { + SettableFuture<Boolean> resultF = SettableFuture.create(); try { Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, "" + System.nanoTime(), messageBody)); RecordMetadata metadata = future.get(); LOGGER.debug("The offset of the record we just sent is: {}", metadata.offset()); publisherMetrics.incrementBrokerPublishedMessage(); - return true; + resultF.set(true); + return resultF; } catch (Throwable e) { LOGGER.error("Cannot send the message", e); publisherMetrics.incrementBrokerFailedToPublishMessage(); - return false; + return Futures.immediateFailedFuture(e); } } - private boolean publishAsync(String topic, String messageBody) { + private ListenableFuture<Boolean> publishAsync(String topic, String messageBody) { try { Future<RecordMetadata> future = producer.send( @@ -121,11 +128,16 @@ publisherMetrics.incrementBrokerFailedToPublishMessage(); } }); - return future != null; + + // The transformation is lightweight, so we can afford using a directExecutor + return Futures.transform( + JdkFutureAdapters.listenInPoolThread(future), + Objects::nonNull, + MoreExecutors.directExecutor()); } catch (Throwable e) { LOGGER.error("Cannot send the message", e); publisherMetrics.incrementBrokerFailedToPublishMessage(); - return false; + return Futures.immediateFailedFuture(e); } } }
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java index bab2ad0..cad2f37 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java +++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializer.java
@@ -14,8 +14,8 @@ package com.googlesource.gerrit.plugins.kafka.subscribe; -import com.gerritforge.gerrit.eventbroker.EventMessage; -import com.google.gson.Gson; +import com.gerritforge.gerrit.eventbroker.EventDeserializer; +import com.google.gerrit.server.events.Event; import com.google.inject.Inject; import com.google.inject.Singleton; import java.util.Map; @@ -23,30 +23,27 @@ import org.apache.kafka.common.serialization.StringDeserializer; @Singleton -public class KafkaEventDeserializer implements Deserializer<EventMessage> { +public class KafkaEventDeserializer implements Deserializer<Event> { private final StringDeserializer stringDeserializer = new StringDeserializer(); - private Gson gson; + private EventDeserializer eventDeserializer; // To be used when providing this deserializer with class name (then need to add a configuration // entry to set the gson.provider public KafkaEventDeserializer() {} @Inject - public KafkaEventDeserializer(Gson gson) { - this.gson = gson; + public KafkaEventDeserializer(EventDeserializer eventDeserializer) { + this.eventDeserializer = eventDeserializer; } @Override public void configure(Map<String, ?> configs, boolean isKey) {} @Override - public EventMessage deserialize(String topic, byte[] data) { - final EventMessage result = - gson.fromJson(stringDeserializer.deserialize(topic, data), EventMessage.class); - result.validate(); - - return result; + public Event deserialize(String topic, byte[] data) { + String json = stringDeserializer.deserialize(topic, data); + return eventDeserializer.deserialize(json); } @Override
diff --git a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java index 7ef9d7b..90b82aa 100644 --- a/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java +++ b/src/main/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventSubscriber.java
@@ -15,8 +15,8 @@ import static java.nio.charset.StandardCharsets.UTF_8; -import com.gerritforge.gerrit.eventbroker.EventMessage; import com.google.common.flogger.FluentLogger; +import com.google.gerrit.server.events.Event; import com.google.gerrit.server.util.ManualRequestContext; import com.google.gerrit.server.util.OneOffRequestContext; import com.google.inject.Inject; @@ -39,14 +39,14 @@ private final OneOffRequestContext oneOffCtx; private final AtomicBoolean closed = new AtomicBoolean(false); - private final Deserializer<EventMessage> valueDeserializer; + private final Deserializer<Event> valueDeserializer; private final KafkaSubscriberProperties configuration; private final ExecutorService executor; private final KafkaEventSubscriberMetrics subscriberMetrics; private final KafkaConsumerFactory consumerFactory; private final Deserializer<byte[]> keyDeserializer; - private java.util.function.Consumer<EventMessage> messageProcessor; + private java.util.function.Consumer<Event> messageProcessor; private String topic; private AtomicBoolean resetOffset = new AtomicBoolean(false); @@ -57,7 +57,7 @@ KafkaSubscriberProperties configuration, KafkaConsumerFactory consumerFactory, Deserializer<byte[]> keyDeserializer, - Deserializer<EventMessage> valueDeserializer, + Deserializer<Event> valueDeserializer, OneOffRequestContext oneOffCtx, @ConsumerExecutor ExecutorService executor, KafkaEventSubscriberMetrics subscriberMetrics) { @@ -71,7 +71,7 @@ this.valueDeserializer = valueDeserializer; } - public void subscribe(String topic, java.util.function.Consumer<EventMessage> messageProcessor) { + public void subscribe(String topic, java.util.function.Consumer<Event> messageProcessor) { this.topic = topic; this.messageProcessor = messageProcessor; logger.atInfo().log( @@ -97,7 +97,7 @@ receiver.wakeup(); } - public java.util.function.Consumer<EventMessage> getMessageProcessor() { + public java.util.function.Consumer<Event> getMessageProcessor() { return messageProcessor; } @@ -146,7 +146,7 @@ consumerRecords.forEach( consumerRecord -> { try (ManualRequestContext ctx = oneOffCtx.open()) { - EventMessage event = + Event event = valueDeserializer.deserialize(consumerRecord.topic(), consumerRecord.value()); messageProcessor.accept(event); } catch (Exception e) {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java index 4e27585..bd47223 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java +++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/EventConsumerIT.java
@@ -19,8 +19,6 @@ import static org.junit.Assert.fail; import com.gerritforge.gerrit.eventbroker.BrokerApi; -import com.gerritforge.gerrit.eventbroker.EventGsonProvider; -import com.gerritforge.gerrit.eventbroker.EventMessage; import com.google.common.base.Stopwatch; import com.google.common.collect.Iterables; import com.google.gerrit.acceptance.LightweightPluginDaemonTest; @@ -33,6 +31,7 @@ import com.google.gerrit.extensions.common.ChangeMessageInfo; import com.google.gerrit.server.events.CommentAddedEvent; import com.google.gerrit.server.events.Event; +import com.google.gerrit.server.events.EventGsonProvider; import com.google.gerrit.server.events.ProjectCreatedEvent; import com.google.gson.Gson; import com.googlesource.gerrit.plugins.kafka.config.KafkaProperties; @@ -40,7 +39,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.UUID; import java.util.function.Supplier; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -131,24 +129,22 @@ @Test @UseLocalDisk - @GerritConfig(name = "plugin.kafka-events.groupId", value = "test-consumer-group") + @GerritConfig(name = "plugin.events-kafka.groupId", value = "test-consumer-group") @GerritConfig( - name = "plugin.kafka-events.keyDeserializer", + name = "plugin.events-kafka.keyDeserializer", value = "org.apache.kafka.common.serialization.StringDeserializer") @GerritConfig( - name = "plugin.kafka-events.valueDeserializer", + name = "plugin.events-kafka.valueDeserializer", value = "org.apache.kafka.common.serialization.StringDeserializer") - @GerritConfig(name = "plugin.kafka-events.pollingIntervalMs", value = "500") + @GerritConfig(name = "plugin.events-kafka.pollingIntervalMs", value = "500") public void shouldReplayAllEvents() throws InterruptedException { String topic = "a_topic"; - EventMessage eventMessage = - new EventMessage( - new EventMessage.Header(UUID.randomUUID(), UUID.randomUUID()), - new ProjectCreatedEvent()); + Event eventMessage = new ProjectCreatedEvent(); + eventMessage.instanceId = "test-instance-id"; Duration WAIT_FOR_POLL_TIMEOUT = Duration.ofMillis(1000); - List<EventMessage> receivedEvents = new ArrayList<>(); + List<Event> receivedEvents = new ArrayList<>(); BrokerApi kafkaBrokerApi = kafkaBrokerApi(); kafkaBrokerApi.send(topic, eventMessage); @@ -157,14 +153,12 @@ waitUntil(() -> receivedEvents.size() == 1, WAIT_FOR_POLL_TIMEOUT); - assertThat(receivedEvents.get(0).getHeader().eventId) - .isEqualTo(eventMessage.getHeader().eventId); + assertThat(receivedEvents.get(0).instanceId).isEqualTo(eventMessage.instanceId); kafkaBrokerApi.replayAllEvents(topic); waitUntil(() -> receivedEvents.size() == 2, WAIT_FOR_POLL_TIMEOUT); - assertThat(receivedEvents.get(1).getHeader().eventId) - .isEqualTo(eventMessage.getHeader().eventId); + assertThat(receivedEvents.get(1).instanceId).isEqualTo(eventMessage.instanceId); } private BrokerApi kafkaBrokerApi() {
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java index 48350f9..d8df198 100644 --- a/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java +++ b/src/test/java/com/googlesource/gerrit/plugins/kafka/api/KafkaBrokerApiTest.java
@@ -17,10 +17,10 @@ import static com.google.common.truth.Truth.assertThat; import static org.mockito.Mockito.mock; -import com.gerritforge.gerrit.eventbroker.EventGsonProvider; -import com.gerritforge.gerrit.eventbroker.EventMessage; -import com.gerritforge.gerrit.eventbroker.EventMessage.Header; import com.google.gerrit.metrics.MetricMaker; +import com.google.gerrit.server.events.Event; +import com.google.gerrit.server.events.EventGson; +import com.google.gerrit.server.events.EventGsonProvider; import com.google.gerrit.server.events.ProjectCreatedEvent; import com.google.gerrit.server.git.WorkQueue; import com.google.gerrit.server.util.IdGenerator; @@ -39,7 +39,6 @@ import com.googlesource.gerrit.plugins.kafka.session.KafkaSession; import java.util.ArrayList; import java.util.List; -import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -62,7 +61,7 @@ private static final String TEST_GROUP_ID = KafkaBrokerApiTest.class.getName(); private static final int TEST_POLLING_INTERVAL_MSEC = 100; private static final int TEST_THREAD_POOL_SIZE = 10; - private static final UUID TEST_INSTANCE_ID = UUID.randomUUID(); + private static final String TEST_INSTANCE_ID = "test-instance-id"; private static final TimeUnit TEST_TIMOUT_UNIT = TimeUnit.SECONDS; private static final int TEST_TIMEOUT = 30; @@ -87,7 +86,10 @@ @Override protected void configure() { - bind(Gson.class).toProvider(EventGsonProvider.class).in(Singleton.class); + bind(Gson.class) + .annotatedWith(EventGson.class) + .toProvider(EventGsonProvider.class) + .in(Singleton.class); bind(MetricMaker.class).toInstance(mock(MetricMaker.class, Answers.RETURNS_DEEP_STUBS)); bind(OneOffRequestContext.class) .toInstance(mock(OneOffRequestContext.class, Answers.RETURNS_DEEP_STUBS)); @@ -105,8 +107,8 @@ } } - public static class TestConsumer implements Consumer<EventMessage> { - public final List<EventMessage> messages = new ArrayList<>(); + public static class TestConsumer implements Consumer<Event> { + public final List<Event> messages = new ArrayList<>(); private final CountDownLatch lock; public TestConsumer(int numMessagesExpected) { @@ -114,7 +116,7 @@ } @Override - public void accept(EventMessage message) { + public void accept(Event message) { messages.add(message); lock.countDown(); } @@ -128,13 +130,6 @@ } } - public static class TestHeader extends Header { - - public TestHeader() { - super(UUID.randomUUID(), TEST_INSTANCE_ID); - } - } - @BeforeClass public static void beforeClass() throws Exception { kafka = new KafkaContainer(); @@ -176,7 +171,8 @@ KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class); String testTopic = "test_topic_sync"; TestConsumer testConsumer = new TestConsumer(1); - EventMessage testEventMessage = new EventMessage(new TestHeader(), new ProjectCreatedEvent()); + Event testEventMessage = new ProjectCreatedEvent(); + testEventMessage.instanceId = TEST_INSTANCE_ID; kafkaBrokerApi.receiveAsync(testTopic, testConsumer); kafkaBrokerApi.send(testTopic, testEventMessage); @@ -192,7 +188,8 @@ KafkaBrokerApi kafkaBrokerApi = injector.getInstance(KafkaBrokerApi.class); String testTopic = "test_topic_async"; TestConsumer testConsumer = new TestConsumer(1); - EventMessage testEventMessage = new EventMessage(new TestHeader(), new ProjectCreatedEvent()); + Event testEventMessage = new ProjectCreatedEvent(); + testEventMessage.instanceId = TEST_INSTANCE_ID; kafkaBrokerApi.send(testTopic, testEventMessage); kafkaBrokerApi.receiveAsync(testTopic, testConsumer);
diff --git a/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java b/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java deleted file mode 100644 index e456a2a..0000000 --- a/src/test/java/com/googlesource/gerrit/plugins/kafka/subscribe/KafkaEventDeserializerTest.java +++ /dev/null
@@ -1,64 +0,0 @@ -// Copyright (C) 2019 The Android Open Source Project -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package com.googlesource.gerrit.plugins.kafka.subscribe; - -import static com.google.common.truth.Truth.assertThat; -import static java.nio.charset.StandardCharsets.UTF_8; - -import com.gerritforge.gerrit.eventbroker.EventGsonProvider; -import com.gerritforge.gerrit.eventbroker.EventMessage; -import com.google.gson.Gson; -import java.util.UUID; -import org.junit.Before; -import org.junit.Test; - -public class KafkaEventDeserializerTest { - private KafkaEventDeserializer deserializer; - - @Before - public void setUp() { - final Gson gson = new EventGsonProvider().get(); - deserializer = new KafkaEventDeserializer(gson); - } - - @Test - public void kafkaEventDeserializerShouldParseAKafkaEvent() { - final UUID eventId = UUID.randomUUID(); - final String eventType = "event-type"; - final UUID sourceInstanceId = UUID.randomUUID(); - final long eventCreatedOn = 10L; - final String eventJson = - String.format( - "{ " - + "\"header\": { \"eventId\": \"%s\", \"eventType\": \"%s\", \"sourceInstanceId\": \"%s\", \"eventCreatedOn\": %d }," - + "\"body\": { \"type\": \"project-created\" }" - + "}", - eventId, eventType, sourceInstanceId, eventCreatedOn); - final EventMessage event = deserializer.deserialize("ignored", eventJson.getBytes(UTF_8)); - - assertThat(event.getHeader().eventId).isEqualTo(eventId); - assertThat(event.getHeader().sourceInstanceId).isEqualTo(sourceInstanceId); - } - - @Test(expected = RuntimeException.class) - public void kafkaEventDeserializerShouldFailForInvalidJson() { - deserializer.deserialize("ignored", "this is not a JSON string".getBytes(UTF_8)); - } - - @Test(expected = RuntimeException.class) - public void kafkaEventDeserializerShouldFailForInvalidObjectButValidJSON() { - deserializer.deserialize("ignored", "{}".getBytes(UTF_8)); - } -}