Skip to content

Commit ba02c97

Browse files
authored
Merge pull request #7 from hyperledger/feature/add-concurrency-test
Feature/add concurrency test
2 parents 1747acd + bae330f commit ba02c97

5 files changed

Lines changed: 255 additions & 9 deletions

File tree

client/src/main/java/jp/co/soramitsu/iroha/java/BlocksQuery.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,6 @@ public static BlocksQueryBuilder builder(String accountId, Long time, long count
5454
}
5555

5656
public static BlocksQueryBuilder builder(String accountId, long counter) {
57-
return builder(accountId, Instant.now(), counter);
57+
return builder(accountId, System.currentTimeMillis(), counter);
5858
}
5959
}

client/src/main/java/jp/co/soramitsu/iroha/java/IrohaAPI.java

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static jp.co.soramitsu.iroha.java.Utils.createTxList;
44
import static jp.co.soramitsu.iroha.java.Utils.createTxStatusRequest;
55

6+
import io.grpc.Channel;
67
import io.grpc.ManagedChannel;
78
import io.grpc.ManagedChannelBuilder;
89
import io.reactivex.Observable;
@@ -29,14 +30,21 @@
2930
/**
3031
* Class which provides convenient RX abstraction over Iroha API.
3132
*/
32-
@Getter
3333
public class IrohaAPI implements Closeable {
3434

35+
private static final WaitUntilCompleted defaultStrategy = new WaitUntilCompleted();
36+
37+
@Getter
3538
private URI uri;
39+
@Getter
3640
private ManagedChannel channel;
41+
@Getter
3742
private CommandService_v1BlockingStub cmdStub;
43+
@Getter
3844
private CommandService_v1Stub cmdStreamingStub;
45+
@Getter
3946
private QueryService_v1BlockingStub queryStub;
47+
@Getter
4048
private QueryService_v1Stub queryStreamingStub;
4149

4250
public IrohaAPI(URI uri) {
@@ -45,16 +53,52 @@ public IrohaAPI(URI uri) {
4553

4654
@SneakyThrows
4755
public IrohaAPI(String host, int port) {
48-
channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
56+
this(
57+
ManagedChannelBuilder
58+
.forAddress(host, port)
59+
.usePlaintext()
60+
.build()
61+
);
62+
63+
this.uri = new URI("grpc", null, host, port, null, null, null);
64+
65+
// set single thread executor for streaming query stub as default
66+
this.setChannelForStreamingQueryStub(ManagedChannelBuilder
67+
.forAddress(host, port)
68+
.directExecutor()
69+
.usePlaintext()
70+
.build()
71+
);
72+
}
73+
74+
public IrohaAPI(ManagedChannel channel) {
75+
this.channel = channel;
76+
this.setChannelForBlockingCmdStub(channel);
77+
this.setChannelForBlockingQueryStub(channel);
78+
this.setChannelForStreamingCmdStub(channel);
79+
this.setChannelForStreamingQueryStub(channel);
80+
}
81+
82+
public IrohaAPI setChannelForBlockingCmdStub(Channel channel) {
4983
cmdStub = CommandService_v1Grpc.newBlockingStub(channel);
50-
queryStub = QueryService_v1Grpc.newBlockingStub(channel);
84+
return this;
85+
}
86+
87+
public IrohaAPI setChannelForStreamingCmdStub(Channel channel) {
5188
cmdStreamingStub = CommandService_v1Grpc.newStub(channel);
52-
queryStreamingStub = QueryService_v1Grpc.newStub(channel);
89+
return this;
90+
}
5391

54-
this.uri = new URI("grpc", null, host, port, null, null, null);
92+
public IrohaAPI setChannelForBlockingQueryStub(Channel channel) {
93+
queryStub = QueryService_v1Grpc.newBlockingStub(channel);
94+
return this;
95+
}
96+
97+
public IrohaAPI setChannelForStreamingQueryStub(Channel channel) {
98+
queryStreamingStub = QueryService_v1Grpc.newStub(channel);
99+
return this;
55100
}
56101

57-
private static final WaitUntilCompleted defaultStrategy = new WaitUntilCompleted();
58102

59103
/**
60104
* Send transaction synchronously, then subscribe for transaction status stream.
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package jp.co.soramitsu.iroha.java
2+
3+
import io.grpc.ManagedChannelBuilder
4+
import io.reactivex.internal.observers.LambdaObserver
5+
import iroha.protocol.QryResponses.BlockQueryResponse
6+
import jp.co.soramitsu.iroha.testcontainers.IrohaContainer
7+
import spock.lang.Specification
8+
9+
import java.util.stream.IntStream
10+
11+
import static jp.co.soramitsu.iroha.testcontainers.detail.GenesisBlockBuilder.*
12+
13+
class ConcurrencyTest extends Specification {
14+
15+
interface TestOutcome {
16+
void fail(String s)
17+
}
18+
19+
static def iroha = new IrohaContainer()
20+
.withLogger(null)
21+
22+
static IrohaAPI api
23+
24+
def setupSpec() {
25+
iroha.start()
26+
api = iroha.api
27+
}
28+
29+
def cleanupSpec() {
30+
iroha.stop()
31+
}
32+
33+
static def getTx(int n) {
34+
return Transaction.builder(defaultAccountId)
35+
.createAccount("${n}", defaultDomainName, defaultKeyPair.public)
36+
.sign(defaultKeyPair)
37+
.build()
38+
}
39+
40+
def subscribeForBlocks() {
41+
def bq = BlocksQuery.builder(defaultAccountId, 1)
42+
.buildSigned(defaultKeyPair)
43+
44+
return api.blocksQuery(bq)
45+
}
46+
47+
String last = ""
48+
49+
def getThreadInfo() {
50+
def t = Thread.currentThread()
51+
return "name=${t.name} id=${t.id}"
52+
}
53+
54+
def checkCurrentThread(TestOutcome test) {
55+
def t = Thread.currentThread()
56+
if (last.isEmpty()) {
57+
last = t.name
58+
return
59+
}
60+
61+
if (last != t.name) {
62+
test.fail("TEST FAILED: last=${last} != current ${t.name}")
63+
}
64+
}
65+
66+
def "iroha api subscribe blocks works on single thread"() {
67+
given:
68+
TestOutcome test = Mock(TestOutcome) {
69+
0 * fail(_)
70+
}
71+
72+
73+
def th1 = Thread.start {
74+
subscribeForBlocks()
75+
.subscribe(new LambdaObserver<BlockQueryResponse>(
76+
{ BlockQueryResponse b ->
77+
def block = b.blockResponse.block.blockV1.payload
78+
def height = block.height
79+
def time = block.createdTime
80+
println("[${getThreadInfo()}] BLOCK height=${height}, time=${time}")
81+
checkCurrentThread(test)
82+
},
83+
{ e ->
84+
println("[${getThreadInfo()}] ERROR: ${e}")
85+
},
86+
{
87+
println("[${getThreadInfo()}] COMPLETE")
88+
},
89+
{
90+
println("[${getThreadInfo()}] SUBSCRIBED")
91+
}
92+
))
93+
}
94+
95+
96+
when: "send few txes"
97+
def th2 = Thread.start {
98+
IntStream.range(0, 100)
99+
.parallel()
100+
.boxed()
101+
.map({ n -> getTx(n) })
102+
.map({ tx -> api.transaction(tx) })
103+
.forEach({ o -> o.blockingSubscribe() })
104+
}
105+
106+
println("[${getThreadInfo()}] MAIN THREAD")
107+
108+
then:
109+
th2.join()
110+
th1.join()
111+
noExceptionThrown()
112+
}
113+
}

client/src/test/groovy/jp/co/soramitsu/iroha/java/IrohaAPITest.groovy

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@ package jp.co.soramitsu.iroha.java
33
import jp.co.soramitsu.iroha.java.debug.TestTransactionStatusObserver
44
import jp.co.soramitsu.iroha.testcontainers.IrohaContainer
55
import spock.lang.Specification
6+
import spock.lang.Timeout
67

8+
import java.util.concurrent.TimeUnit
79
import java.util.stream.Collectors
810
import java.util.stream.IntStream
911

@@ -12,7 +14,7 @@ import static jp.co.soramitsu.iroha.testcontainers.detail.GenesisBlockBuilder.*
1214
class IrohaAPITest extends Specification {
1315

1416
static private IrohaContainer iroha = new IrohaContainer()
15-
17+
1618
def setupSpec() {
1719
iroha.start()
1820
}
@@ -114,7 +116,11 @@ class IrohaAPITest extends Specification {
114116
.onTransactionCommitted({ z -> onCommitted = true })
115117
.build()
116118

117-
api.txStatus(h).blockingSubscribe(obs)
119+
api.txStatus(h)
120+
.doOnError({n -> println(n)})
121+
.doOnNext({n -> println(n)})
122+
.doOnComplete({ -> println('COMPLETE')})
123+
.blockingSubscribe(obs)
118124

119125
return onCommitted
120126
})
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package jp.co.soramitsu.iroha.java;
2+
3+
import static jp.co.soramitsu.iroha.testcontainers.detail.GenesisBlockBuilder.defaultAccountId;
4+
import static jp.co.soramitsu.iroha.testcontainers.detail.GenesisBlockBuilder.defaultDomainName;
5+
import static jp.co.soramitsu.iroha.testcontainers.detail.GenesisBlockBuilder.defaultKeyPair;
6+
7+
import iroha.protocol.Commands.Command.CommandCase;
8+
import iroha.protocol.Commands.CreateAccount;
9+
import iroha.protocol.Commands.CreateAsset;
10+
import iroha.protocol.Endpoint.ToriiResponse;
11+
import iroha.protocol.Primitive.RolePermission;
12+
import java.util.Collections;
13+
import jp.co.soramitsu.iroha.java.routers.CmdRouter;
14+
import jp.co.soramitsu.iroha.java.routers.TxStatusRouter;
15+
import jp.co.soramitsu.iroha.testcontainers.IrohaContainer;
16+
import lombok.val;
17+
18+
public class Example2 {
19+
20+
/**
21+
* Case Study: I want to send a transaction, then when it is committed I want to log content of
22+
* transaction commands, depending on their type.
23+
*/
24+
public static void main(String[] args) {
25+
// lets start Iroha without logger
26+
IrohaContainer iroha = new IrohaContainer()
27+
.withLogger(null /* disable logger */);
28+
29+
iroha.start();
30+
IrohaAPI api = iroha.getApi();
31+
32+
// create a transaction
33+
val tx = Transaction.builder(defaultAccountId)
34+
.createAccount("0", defaultDomainName, defaultKeyPair.getPublic())
35+
.createAsset("usd", defaultDomainName, 2)
36+
.createRole("role", Collections.singletonList(RolePermission.can_add_asset_qty))
37+
.sign(defaultKeyPair)
38+
.build();
39+
40+
// define command router
41+
val cmdRouter = new CmdRouter();
42+
val r = new TxStatusRouter();
43+
44+
// handle CREATE_ACCOUNT command
45+
cmdRouter.handle(CommandCase.CREATE_ACCOUNT, cmd -> {
46+
CreateAccount c = cmd.getCreateAccount();
47+
System.out.println(
48+
String.format("[create account] name=%s domain=%s publickey=%s",
49+
c.getAccountName(),
50+
c.getDomainId(),
51+
c.getPublicKey())
52+
);
53+
});
54+
55+
// handle CREATE_ASSET
56+
cmdRouter.handle(CommandCase.CREATE_ASSET, cmd -> {
57+
CreateAsset c = cmd.getCreateAsset();
58+
System.out.println(
59+
String.format("[create asset] name=%s domain=%s precision=%d",
60+
c.getAssetName(),
61+
c.getDomainId(),
62+
c.getPrecision())
63+
);
64+
});
65+
66+
val observer = TransactionStatusObserver.builder()
67+
.onTransactionCommitted((ToriiResponse tr) -> {
68+
System.out.println("Received COMMITTED for transaction " + tr.getTxHash());
69+
70+
// process all commands for each committed transaction
71+
tx.getPayload()
72+
.getReducedPayload()
73+
.getCommandsList()
74+
.forEach(cmdRouter::process);
75+
})
76+
.build();
77+
78+
api.transaction(tx)
79+
.blockingSubscribe(observer);
80+
81+
iroha.stop();
82+
}
83+
}

0 commit comments

Comments
 (0)