Skip to content

eventapis-40 (#40) mark success for an operation #41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public interface EventRepository {

List<EntityEvent> markFail(String opId);

List<EntityEvent> markSuccess(String opId);

<P extends PublishedEvent> EventKey recordAndPublish(P publishedEvent) throws EventStoreException, ConcurrentEventException;

<P extends PublishedEvent> EventKey recordAndPublish(Entity entity, P publishedEvent) throws EventStoreException, ConcurrentEventException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public List<EntityEvent> markFail(String key) {
update.where(QueryBuilder.eq(ENTITY_ID, entityEvent.getEventKey().getEntityId()))
.and(QueryBuilder.eq(VERSION, entityEvent.getEventKey().getVersion()))
.ifExists();
update.with(QueryBuilder.set(STATUS, "FAILED"));
update.with(QueryBuilder.set(STATUS, EventState.FAILED.name()));
ResultSet execute = cassandraSession.execute(update);
log.debug("Failure Mark Result:" + execute.toString() + " Update: " + update.toString());
return true;
Expand All @@ -186,6 +186,32 @@ public List<EntityEvent> markFail(String key) {

}

@Override
public List<EntityEvent> markSuccess(String key) {
Select select = QueryBuilder.select().from(tableNameByOps);
select.where(QueryBuilder.eq(OP_ID, key));
List<Row> entityEventDatas = cassandraSession.execute(select, PagingIterable::all);

return entityEventDatas.stream().map(
CassandraViewQuery::convertToEntityEvent
).filter(entityEvent -> {
try {
Update update = QueryBuilder.update(tableName);
update.where(QueryBuilder.eq(ENTITY_ID, entityEvent.getEventKey().getEntityId()))
.and(QueryBuilder.eq(VERSION, entityEvent.getEventKey().getVersion()))
.ifExists();
update.with(QueryBuilder.set(STATUS, EventState.CREATED.name()));
ResultSet execute = cassandraSession.execute(update);
log.debug("Success Mark Result:" + execute.toString() + " Update: " + update.toString());
return true;
} catch (Exception e) {
log.warn(e.getMessage(), e);
return false;
}
}).collect(Collectors.toList());

}


@Override
public String updateEvent(EventKey eventKey, @Nullable RecordedEvent newEventData, @Nullable EventState newEventState, @Nullable String newEventType) throws EventStoreException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ <R extends RecordedEvent, T extends Exception> EventKey recordEntityEvent(

List<EntityEvent> markFail(String key);

List<EntityEvent> markSuccess(String key);

String updateEvent(EventKey eventKey, @Nullable RecordedEvent newEventData, @Nullable EventState newEventState, @Nullable String newEventType) throws EventStoreException;

String updateEvent(EventKey eventKey, RecordedEvent newEventData) throws EventStoreException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public List<EntityEvent> markFail(String opId) {
return eventRecorder.markFail(opId);
}

@Override
public List<EntityEvent> markSuccess(String opId) {
return eventRecorder.markSuccess(opId);
}

@Override
public <P extends PublishedEvent> EventKey recordAndPublish(P publishedEvent) throws EventStoreException, ConcurrentEventException {
return recordAndPublishInternal(publishedEvent, Optional.empty(), entityEvent -> new DefaultConcurrencyResolver());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.kloia.eventapis.api.impl.UUIDCreationStrategy;
import com.kloia.eventapis.cassandra.ConcurrencyResolver;
import com.kloia.eventapis.cassandra.ConcurrentEventException;
import com.kloia.eventapis.cassandra.ConcurrentEventResolver;
import com.kloia.eventapis.cassandra.DefaultConcurrencyResolver;
import com.kloia.eventapis.cassandra.EntityEvent;
import com.kloia.eventapis.common.EventKey;
Expand Down Expand Up @@ -54,17 +53,22 @@ public class CompositeRepositoryImplTest {

@Rule
public ExpectedException expectedException = ExpectedException.none();

@InjectMocks
private CompositeRepositoryImpl compositeRepository;

@Mock
private EventRecorder eventRecorder;

@Mock
private ObjectMapper objectMapper;

@Mock
private IOperationRepository operationRepository;

@Mock
private IdCreationStrategy idCreationStrategy = new UUIDCreationStrategy();

@Captor
private ArgumentCaptor<Function<EntityEvent, ConcurrencyResolver<ConcurrentEventException>>> concurrencyResolverFactoryCaptor;

Expand Down Expand Up @@ -111,6 +115,13 @@ public void shouldMarkFail() {
verify(eventRecorder).markFail("opId");
}

@Test
public void shouldMarkSuccess() {
compositeRepository.markSuccess("opId");

verify(eventRecorder).markSuccess("opId");
}

private void mockCommon(PublishedEvent event) throws EventStoreException, ConcurrentEventException, JsonProcessingException {
when(eventRecorder.recordEntityEvent(eq(event), anyLong(), previousEventKeyCaptor.capture(), concurrencyResolverFactoryCaptor.capture())).thenReturn(eventKey);
when(objectWriter.writeValueAsString(event)).thenReturn("{" + event.getClass().getSimpleName() + "}");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.kloia.eventapis.view;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner;

import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;

@RunWith(MockitoJUnitRunner.class)
public class EntityFunctionSpecTest {

private EntityFunctionSpec entityFunctionSpec;

private class EntityType {
}

private class QueryType {
}

@Before
public void setUp() throws Exception {
entityFunctionSpec = new EntityFunctionSpec<EntityType, QueryType>((previous, event) -> null) {
@Override
public EntityFunction<EntityType, QueryType> getEntityFunction() {
return super.getEntityFunction();
}
};
}

@Test
public void shouldGetQueryType() throws Exception {
Class queryType = entityFunctionSpec.getQueryType();
assertThat(queryType, equalTo(QueryType.class));
}

@Test
public void shouldGetEntityType() throws Exception {
Class entityType = entityFunctionSpec.getEntityType();
assertThat(entityType, equalTo(EntityType.class));
}

}