UnicastProcessor.smali 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971
  1. .class public final Lio/reactivex/processors/UnicastProcessor;
  2. .super Lio/reactivex/processors/FlowableProcessor;
  3. .source "UnicastProcessor.java"
  4. # annotations
  5. .annotation system Ldalvik/annotation/MemberClasses;
  6. value = {
  7. Lio/reactivex/processors/UnicastProcessor$UnicastQueueSubscription;
  8. }
  9. .end annotation
  10. .annotation system Ldalvik/annotation/Signature;
  11. value = {
  12. "<T:",
  13. "Ljava/lang/Object;",
  14. ">",
  15. "Lio/reactivex/processors/FlowableProcessor<",
  16. "TT;>;"
  17. }
  18. .end annotation
  19. # instance fields
  20. .field final actual:Ljava/util/concurrent/atomic/AtomicReference;
  21. .annotation system Ldalvik/annotation/Signature;
  22. value = {
  23. "Ljava/util/concurrent/atomic/AtomicReference<",
  24. "Lorg/reactivestreams/Subscriber<",
  25. "-TT;>;>;"
  26. }
  27. .end annotation
  28. .end field
  29. .field volatile cancelled:Z
  30. .field volatile done:Z
  31. .field enableOperatorFusion:Z
  32. .field error:Ljava/lang/Throwable;
  33. .field final onTerminate:Ljava/util/concurrent/atomic/AtomicReference;
  34. .annotation system Ldalvik/annotation/Signature;
  35. value = {
  36. "Ljava/util/concurrent/atomic/AtomicReference<",
  37. "Ljava/lang/Runnable;",
  38. ">;"
  39. }
  40. .end annotation
  41. .end field
  42. .field final once:Ljava/util/concurrent/atomic/AtomicBoolean;
  43. .field final queue:Lio/reactivex/internal/queue/SpscLinkedArrayQueue;
  44. .annotation system Ldalvik/annotation/Signature;
  45. value = {
  46. "Lio/reactivex/internal/queue/SpscLinkedArrayQueue<",
  47. "TT;>;"
  48. }
  49. .end annotation
  50. .end field
  51. .field final requested:Ljava/util/concurrent/atomic/AtomicLong;
  52. .field final wip:Lio/reactivex/internal/subscriptions/BasicIntQueueSubscription;
  53. .annotation system Ldalvik/annotation/Signature;
  54. value = {
  55. "Lio/reactivex/internal/subscriptions/BasicIntQueueSubscription<",
  56. "TT;>;"
  57. }
  58. .end annotation
  59. .end field
  60. # direct methods
  61. .method constructor <init>(I)V
  62. .locals 2
  63. .line 107
  64. invoke-direct {p0}, Lio/reactivex/processors/FlowableProcessor;-><init>()V
  65. .line 108
  66. new-instance v0, Lio/reactivex/internal/queue/SpscLinkedArrayQueue;
  67. const-string v1, "capacityHint"
  68. invoke-static {p1, v1}, Lio/reactivex/internal/functions/ObjectHelper;->verifyPositive(ILjava/lang/String;)I
  69. move-result p1
  70. invoke-direct {v0, p1}, Lio/reactivex/internal/queue/SpscLinkedArrayQueue;-><init>(I)V
  71. iput-object v0, p0, Lio/reactivex/processors/UnicastProcessor;->queue:Lio/reactivex/internal/queue/SpscLinkedArrayQueue;
  72. .line 109
  73. new-instance p1, Ljava/util/concurrent/atomic/AtomicReference;
  74. invoke-direct {p1}, Ljava/util/concurrent/atomic/AtomicReference;-><init>()V
  75. iput-object p1, p0, Lio/reactivex/processors/UnicastProcessor;->onTerminate:Ljava/util/concurrent/atomic/AtomicReference;
  76. .line 110
  77. new-instance p1, Ljava/util/concurrent/atomic/AtomicReference;
  78. invoke-direct {p1}, Ljava/util/concurrent/atomic/AtomicReference;-><init>()V
  79. iput-object p1, p0, Lio/reactivex/processors/UnicastProcessor;->actual:Ljava/util/concurrent/atomic/AtomicReference;
  80. .line 111
  81. new-instance p1, Ljava/util/concurrent/atomic/AtomicBoolean;
  82. invoke-direct {p1}, Ljava/util/concurrent/atomic/AtomicBoolean;-><init>()V
  83. iput-object p1, p0, Lio/reactivex/processors/UnicastProcessor;->once:Ljava/util/concurrent/atomic/AtomicBoolean;
  84. .line 112
  85. new-instance p1, Lio/reactivex/processors/UnicastProcessor$UnicastQueueSubscription;
  86. invoke-direct {p1, p0}, Lio/reactivex/processors/UnicastProcessor$UnicastQueueSubscription;-><init>(Lio/reactivex/processors/UnicastProcessor;)V
  87. iput-object p1, p0, Lio/reactivex/processors/UnicastProcessor;->wip:Lio/reactivex/internal/subscriptions/BasicIntQueueSubscription;
  88. .line 113
  89. new-instance p1, Ljava/util/concurrent/atomic/AtomicLong;
  90. invoke-direct {p1}, Ljava/util/concurrent/atomic/AtomicLong;-><init>()V
  91. iput-object p1, p0, Lio/reactivex/processors/UnicastProcessor;->requested:Ljava/util/concurrent/atomic/AtomicLong;
  92. return-void
  93. .end method
  94. .method constructor <init>(ILjava/lang/Runnable;)V
  95. .locals 2
  96. .line 123
  97. invoke-direct {p0}, Lio/reactivex/processors/FlowableProcessor;-><init>()V
  98. .line 124
  99. new-instance v0, Lio/reactivex/internal/queue/SpscLinkedArrayQueue;
  100. const-string v1, "capacityHint"
  101. invoke-static {p1, v1}, Lio/reactivex/internal/functions/ObjectHelper;->verifyPositive(ILjava/lang/String;)I
  102. move-result p1
  103. invoke-direct {v0, p1}, Lio/reactivex/internal/queue/SpscLinkedArrayQueue;-><init>(I)V
  104. iput-object v0, p0, Lio/reactivex/processors/UnicastProcessor;->queue:Lio/reactivex/internal/queue/SpscLinkedArrayQueue;
  105. .line 125
  106. new-instance p1, Ljava/util/concurrent/atomic/AtomicReference;
  107. const-string v0, "onTerminate"
  108. invoke-static {p2, v0}, Lio/reactivex/internal/functions/ObjectHelper;->requireNonNull(Ljava/lang/Object;Ljava/lang/String;)Ljava/lang/Object;
  109. move-result-object p2
  110. invoke-direct {p1, p2}, Ljava/util/concurrent/atomic/AtomicReference;-><init>(Ljava/lang/Object;)V
  111. iput-object p1, p0, Lio/reactivex/processors/UnicastProcessor;->onTerminate:Ljava/util/concurrent/atomic/AtomicReference;
  112. .line 126
  113. new-instance p1, Ljava/util/concurrent/atomic/AtomicReference;
  114. invoke-direct {p1}, Ljava/util/concurrent/atomic/AtomicReference;-><init>()V
  115. iput-object p1, p0, Lio/reactivex/processors/UnicastProcessor;->actual:Ljava/util/concurrent/atomic/AtomicReference;
  116. .line 127
  117. new-instance p1, Ljava/util/concurrent/atomic/AtomicBoolean;
  118. invoke-direct {p1}, Ljava/util/concurrent/atomic/AtomicBoolean;-><init>()V
  119. iput-object p1, p0, Lio/reactivex/processors/UnicastProcessor;->once:Ljava/util/concurrent/atomic/AtomicBoolean;
  120. .line 128
  121. new-instance p1, Lio/reactivex/processors/UnicastProcessor$UnicastQueueSubscription;
  122. invoke-direct {p1, p0}, Lio/reactivex/processors/UnicastProcessor$UnicastQueueSubscription;-><init>(Lio/reactivex/processors/UnicastProcessor;)V
  123. iput-object p1, p0, Lio/reactivex/processors/UnicastProcessor;->wip:Lio/reactivex/internal/subscriptions/BasicIntQueueSubscription;
  124. .line 129
  125. new-instance p1, Ljava/util/concurrent/atomic/AtomicLong;
  126. invoke-direct {p1}, Ljava/util/concurrent/atomic/AtomicLong;-><init>()V
  127. iput-object p1, p0, Lio/reactivex/processors/UnicastProcessor;->requested:Ljava/util/concurrent/atomic/AtomicLong;
  128. return-void
  129. .end method
  130. .method public static create()Lio/reactivex/processors/UnicastProcessor;
  131. .locals 2
  132. .annotation system Ldalvik/annotation/Signature;
  133. value = {
  134. "<T:",
  135. "Ljava/lang/Object;",
  136. ">()",
  137. "Lio/reactivex/processors/UnicastProcessor<",
  138. "TT;>;"
  139. }
  140. .end annotation
  141. .annotation runtime Lio/reactivex/annotations/CheckReturnValue;
  142. .end annotation
  143. .line 71
  144. new-instance v0, Lio/reactivex/processors/UnicastProcessor;
  145. invoke-static {}, Lio/reactivex/processors/UnicastProcessor;->bufferSize()I
  146. move-result v1
  147. invoke-direct {v0, v1}, Lio/reactivex/processors/UnicastProcessor;-><init>(I)V
  148. return-object v0
  149. .end method
  150. .method public static create(I)Lio/reactivex/processors/UnicastProcessor;
  151. .locals 1
  152. .annotation system Ldalvik/annotation/Signature;
  153. value = {
  154. "<T:",
  155. "Ljava/lang/Object;",
  156. ">(I)",
  157. "Lio/reactivex/processors/UnicastProcessor<",
  158. "TT;>;"
  159. }
  160. .end annotation
  161. .annotation runtime Lio/reactivex/annotations/CheckReturnValue;
  162. .end annotation
  163. .line 82
  164. new-instance v0, Lio/reactivex/processors/UnicastProcessor;
  165. invoke-direct {v0, p0}, Lio/reactivex/processors/UnicastProcessor;-><init>(I)V
  166. return-object v0
  167. .end method
  168. .method public static create(ILjava/lang/Runnable;)Lio/reactivex/processors/UnicastProcessor;
  169. .locals 1
  170. .annotation system Ldalvik/annotation/Signature;
  171. value = {
  172. "<T:",
  173. "Ljava/lang/Object;",
  174. ">(I",
  175. "Ljava/lang/Runnable;",
  176. ")",
  177. "Lio/reactivex/processors/UnicastProcessor<",
  178. "TT;>;"
  179. }
  180. .end annotation
  181. .annotation runtime Lio/reactivex/annotations/CheckReturnValue;
  182. .end annotation
  183. .line 99
  184. new-instance v0, Lio/reactivex/processors/UnicastProcessor;
  185. invoke-direct {v0, p0, p1}, Lio/reactivex/processors/UnicastProcessor;-><init>(ILjava/lang/Runnable;)V
  186. return-object v0
  187. .end method
  188. # virtual methods
  189. .method checkTerminated(ZZLorg/reactivestreams/Subscriber;Lio/reactivex/internal/queue/SpscLinkedArrayQueue;)Z
  190. .locals 3
  191. .annotation system Ldalvik/annotation/Signature;
  192. value = {
  193. "(ZZ",
  194. "Lorg/reactivestreams/Subscriber<",
  195. "-TT;>;",
  196. "Lio/reactivex/internal/queue/SpscLinkedArrayQueue<",
  197. "TT;>;)Z"
  198. }
  199. .end annotation
  200. .line 247
  201. iget-boolean v0, p0, Lio/reactivex/processors/UnicastProcessor;->cancelled:Z
  202. const/4 v1, 0x1
  203. const/4 v2, 0x0
  204. if-eqz v0, :cond_0
  205. .line 248
  206. invoke-virtual {p4}, Lio/reactivex/internal/queue/SpscLinkedArrayQueue;->clear()V
  207. .line 249
  208. iget-object p1, p0, Lio/reactivex/processors/UnicastProcessor;->actual:Ljava/util/concurrent/atomic/AtomicReference;
  209. invoke-virtual {p1, v2}, Ljava/util/concurrent/atomic/AtomicReference;->lazySet(Ljava/lang/Object;)V
  210. return v1
  211. :cond_0
  212. if-eqz p1, :cond_2
  213. if-eqz p2, :cond_2
  214. .line 253
  215. iget-object p1, p0, Lio/reactivex/processors/UnicastProcessor;->error:Ljava/lang/Throwable;
  216. .line 254
  217. iget-object p2, p0, Lio/reactivex/processors/UnicastProcessor;->actual:Ljava/util/concurrent/atomic/AtomicReference;
  218. invoke-virtual {p2, v2}, Ljava/util/concurrent/atomic/AtomicReference;->lazySet(Ljava/lang/Object;)V
  219. if-eqz p1, :cond_1
  220. .line 256
  221. invoke-interface {p3, p1}, Lorg/reactivestreams/Subscriber;->onError(Ljava/lang/Throwable;)V
  222. goto :goto_0
  223. .line 258
  224. :cond_1
  225. invoke-interface {p3}, Lorg/reactivestreams/Subscriber;->onComplete()V
  226. :goto_0
  227. return v1
  228. :cond_2
  229. const/4 p1, 0x0
  230. return p1
  231. .end method
  232. .method doTerminate()V
  233. .locals 3
  234. .line 133
  235. iget-object v0, p0, Lio/reactivex/processors/UnicastProcessor;->onTerminate:Ljava/util/concurrent/atomic/AtomicReference;
  236. invoke-virtual {v0}, Ljava/util/concurrent/atomic/AtomicReference;->get()Ljava/lang/Object;
  237. move-result-object v0
  238. check-cast v0, Ljava/lang/Runnable;
  239. if-eqz v0, :cond_0
  240. .line 134
  241. iget-object v1, p0, Lio/reactivex/processors/UnicastProcessor;->onTerminate:Ljava/util/concurrent/atomic/AtomicReference;
  242. const/4 v2, 0x0
  243. invoke-virtual {v1, v0, v2}, Ljava/util/concurrent/atomic/AtomicReference;->compareAndSet(Ljava/lang/Object;Ljava/lang/Object;)Z
  244. move-result v1
  245. if-eqz v1, :cond_0
  246. .line 135
  247. invoke-interface {v0}, Ljava/lang/Runnable;->run()V
  248. :cond_0
  249. return-void
  250. .end method
  251. .method drain()V
  252. .locals 2
  253. .line 220
  254. iget-object v0, p0, Lio/reactivex/processors/UnicastProcessor;->wip:Lio/reactivex/internal/subscriptions/BasicIntQueueSubscription;
  255. invoke-virtual {v0}, Lio/reactivex/internal/subscriptions/BasicIntQueueSubscription;->getAndIncrement()I
  256. move-result v0
  257. if-eqz v0, :cond_0
  258. return-void
  259. :cond_0
  260. const/4 v0, 0x1
  261. .line 226
  262. iget-object v1, p0, Lio/reactivex/processors/UnicastProcessor;->actual:Ljava/util/concurrent/atomic/AtomicReference;
  263. invoke-virtual {v1}, Ljava/util/concurrent/atomic/AtomicReference;->get()Ljava/lang/Object;
  264. move-result-object v1
  265. check-cast v1, Lorg/reactivestreams/Subscriber;
  266. :goto_0
  267. if-eqz v1, :cond_2
  268. .line 230
  269. iget-boolean v0, p0, Lio/reactivex/processors/UnicastProcessor;->enableOperatorFusion:Z
  270. if-eqz v0, :cond_1
  271. .line 231
  272. invoke-virtual {p0, v1}, Lio/reactivex/processors/UnicastProcessor;->drainFused(Lorg/reactivestreams/Subscriber;)V
  273. goto :goto_1
  274. .line 233
  275. :cond_1
  276. invoke-virtual {p0, v1}, Lio/reactivex/processors/UnicastProcessor;->drainRegular(Lorg/reactivestreams/Subscriber;)V
  277. :goto_1
  278. return-void
  279. .line 238
  280. :cond_2
  281. iget-object v1, p0, Lio/reactivex/processors/UnicastProcessor;->wip:Lio/reactivex/internal/subscriptions/BasicIntQueueSubscription;
  282. neg-int v0, v0
  283. invoke-virtual {v1, v0}, Lio/reactivex/internal/subscriptions/BasicIntQueueSubscription;->addAndGet(I)I
  284. move-result v0
  285. if-nez v0, :cond_3
  286. return-void
  287. .line 242
  288. :cond_3
  289. iget-object v1, p0, Lio/reactivex/processors/UnicastProcessor;->actual:Ljava/util/concurrent/atomic/AtomicReference;
  290. invoke-virtual {v1}, Ljava/util/concurrent/atomic/AtomicReference;->get()Ljava/lang/Object;
  291. move-result-object v1
  292. check-cast v1, Lorg/reactivestreams/Subscriber;
  293. goto :goto_0
  294. .end method
  295. .method drainFused(Lorg/reactivestreams/Subscriber;)V
  296. .locals 4
  297. .annotation system Ldalvik/annotation/Signature;
  298. value = {
  299. "(",
  300. "Lorg/reactivestreams/Subscriber<",
  301. "-TT;>;)V"
  302. }
  303. .end annotation
  304. .line 186
  305. iget-object v0, p0, Lio/reactivex/processors/UnicastProcessor;->queue:Lio/reactivex/internal/queue/SpscLinkedArrayQueue;
  306. const/4 v1, 0x1
  307. .line 190
  308. :cond_0
  309. iget-boolean v2, p0, Lio/reactivex/processors/UnicastProcessor;->cancelled:Z
  310. const/4 v3, 0x0
  311. if-eqz v2, :cond_1
  312. .line 191
  313. invoke-virtual {v0}, Lio/reactivex/internal/queue/SpscLinkedArrayQueue;->clear()V
  314. .line 192
  315. iget-object p1, p0, Lio/reactivex/processors/UnicastProcessor;->actual:Ljava/util/concurrent/atomic/AtomicReference;
  316. invoke-virtual {p1, v3}, Ljava/util/concurrent/atomic/AtomicReference;->lazySet(Ljava/lang/Object;)V
  317. return-void
  318. .line 196
  319. :cond_1
  320. iget-boolean v2, p0, Lio/reactivex/processors/UnicastProcessor;->done:Z
  321. .line 198
  322. invoke-interface {p1, v3}, Lorg/reactivestreams/Subscriber;->onNext(Ljava/lang/Object;)V
  323. if-eqz v2, :cond_3
  324. .line 201
  325. iget-object v0, p0, Lio/reactivex/processors/UnicastProcessor;->actual:Ljava/util/concurrent/atomic/AtomicReference;
  326. invoke-virtual {v0, v3}, Ljava/util/concurrent/atomic/AtomicReference;->lazySet(Ljava/lang/Object;)V
  327. .line 203
  328. iget-object v0, p0, Lio/reactivex/processors/UnicastProcessor;->error:Ljava/lang/Throwable;
  329. if-eqz v0, :cond_2
  330. .line 205
  331. invoke-interface {p1, v0}, Lorg/reactivestreams/Subscriber;->onError(Ljava/lang/Throwable;)V
  332. goto :goto_0
  333. .line 207
  334. :cond_2
  335. invoke-interface {p1}, Lorg/reactivestreams/Subscriber;->onComplete()V
  336. :goto_0
  337. return-void
  338. .line 212
  339. :cond_3
  340. iget-object v2, p0, Lio/reactivex/processors/UnicastProcessor;->wip:Lio/reactivex/internal/subscriptions/BasicIntQueueSubscription;
  341. neg-int v1, v1
  342. invoke-virtual {v2, v1}, Lio/reactivex/internal/subscriptions/BasicIntQueueSubscription;->addAndGet(I)I
  343. move-result v1
  344. if-nez v1, :cond_0
  345. return-void
  346. .end method
  347. .method drainRegular(Lorg/reactivestreams/Subscriber;)V
  348. .locals 12
  349. .annotation system Ldalvik/annotation/Signature;
  350. value = {
  351. "(",
  352. "Lorg/reactivestreams/Subscriber<",
  353. "-TT;>;)V"
  354. }
  355. .end annotation
  356. .line 142
  357. iget-object v0, p0, Lio/reactivex/processors/UnicastProcessor;->queue:Lio/reactivex/internal/queue/SpscLinkedArrayQueue;
  358. const/4 v1, 0x1
  359. const/4 v2, 0x1
  360. .line 146
  361. :cond_0
  362. iget-object v3, p0, Lio/reactivex/processors/UnicastProcessor;->requested:Ljava/util/concurrent/atomic/AtomicLong;
  363. invoke-virtual {v3}, Ljava/util/concurrent/atomic/AtomicLong;->get()J
  364. move-result-wide v3
  365. const-wide/16 v5, 0x0
  366. move-wide v7, v5
  367. :goto_0
  368. cmp-long v9, v3, v7
  369. if-eqz v9, :cond_4
  370. .line 150
  371. iget-boolean v9, p0, Lio/reactivex/processors/UnicastProcessor;->done:Z
  372. .line 152
  373. invoke-virtual {v0}, Lio/reactivex/internal/queue/SpscLinkedArrayQueue;->poll()Ljava/lang/Object;
  374. move-result-object v10
  375. if-nez v10, :cond_1
  376. const/4 v11, 0x1
  377. goto :goto_1
  378. :cond_1
  379. const/4 v11, 0x0
  380. .line 155
  381. :goto_1
  382. invoke-virtual {p0, v9, v11, p1, v0}, Lio/reactivex/processors/UnicastProcessor;->checkTerminated(ZZLorg/reactivestreams/Subscriber;Lio/reactivex/internal/queue/SpscLinkedArrayQueue;)Z
  383. move-result v9
  384. if-eqz v9, :cond_2
  385. return-void
  386. :cond_2
  387. if-eqz v11, :cond_3
  388. goto :goto_2
  389. .line 163
  390. :cond_3
  391. invoke-interface {p1, v10}, Lorg/reactivestreams/Subscriber;->onNext(Ljava/lang/Object;)V
  392. const-wide/16 v9, 0x1
  393. add-long/2addr v7, v9
  394. goto :goto_0
  395. :cond_4
  396. :goto_2
  397. cmp-long v9, v3, v7
  398. if-nez v9, :cond_5
  399. .line 168
  400. iget-boolean v9, p0, Lio/reactivex/processors/UnicastProcessor;->done:Z
  401. invoke-virtual {v0}, Lio/reactivex/internal/queue/SpscLinkedArrayQueue;->isEmpty()Z
  402. move-result v10
  403. invoke-virtual {p0, v9, v10, p1, v0}, Lio/reactivex/processors/UnicastProcessor;->checkTerminated(ZZLorg/reactivestreams/Subscriber;Lio/reactivex/internal/queue/SpscLinkedArrayQueue;)Z
  404. move-result v9
  405. if-eqz v9, :cond_5
  406. return-void
  407. :cond_5
  408. cmp-long v9, v7, v5
  409. if-eqz v9, :cond_6
  410. const-wide v5, 0x7fffffffffffffffL
  411. cmp-long v9, v3, v5
  412. if-eqz v9, :cond_6
  413. .line 173
  414. iget-object v3, p0, Lio/reactivex/processors/UnicastProcessor;->requested:Ljava/util/concurrent/atomic/AtomicLong;
  415. neg-long v4, v7
  416. invoke-virtual {v3, v4, v5}, Ljava/util/concurrent/atomic/AtomicLong;->addAndGet(J)J
  417. .line 176
  418. :cond_6
  419. iget-object v3, p0, Lio/reactivex/processors/UnicastProcessor;->wip:Lio/reactivex/internal/subscriptions/BasicIntQueueSubscription;
  420. neg-int v2, v2
  421. invoke-virtual {v3, v2}, Lio/reactivex/internal/subscriptions/BasicIntQueueSubscription;->addAndGet(I)I
  422. move-result v2
  423. if-nez v2, :cond_0
  424. return-void
  425. .end method
  426. .method public getThrowable()Ljava/lang/Throwable;
  427. .locals 1
  428. .line 400
  429. iget-boolean v0, p0, Lio/reactivex/processors/UnicastProcessor;->done:Z
  430. if-eqz v0, :cond_0
  431. .line 401
  432. iget-object v0, p0, Lio/reactivex/processors/UnicastProcessor;->error:Ljava/lang/Throwable;
  433. return-object v0
  434. :cond_0
  435. const/4 v0, 0x0
  436. return-object v0
  437. .end method
  438. .method public hasComplete()Z
  439. .locals 1
  440. .line 408
  441. iget-boolean v0, p0, Lio/reactivex/processors/UnicastProcessor;->done:Z
  442. if-eqz v0, :cond_0
  443. iget-object v0, p0, Lio/reactivex/processors/UnicastProcessor;->error:Ljava/lang/Throwable;
  444. if-nez v0, :cond_0
  445. const/4 v0, 0x1
  446. goto :goto_0
  447. :cond_0
  448. const/4 v0, 0x0
  449. :goto_0
  450. return v0
  451. .end method
  452. .method public hasSubscribers()Z
  453. .locals 1
  454. .line 395
  455. iget-object v0, p0, Lio/reactivex/processors/UnicastProcessor;->actual:Ljava/util/concurrent/atomic/AtomicReference;
  456. invoke-virtual {v0}, Ljava/util/concurrent/atomic/AtomicReference;->get()Ljava/lang/Object;
  457. move-result-object v0
  458. if-eqz v0, :cond_0
  459. const/4 v0, 0x1
  460. goto :goto_0
  461. :cond_0
  462. const/4 v0, 0x0
  463. :goto_0
  464. return v0
  465. .end method
  466. .method public hasThrowable()Z
  467. .locals 1
  468. .line 413
  469. iget-boolean v0, p0, Lio/reactivex/processors/UnicastProcessor;->done:Z
  470. if-eqz v0, :cond_0
  471. iget-object v0, p0, Lio/reactivex/processors/UnicastProcessor;->error:Ljava/lang/Throwable;
  472. if-eqz v0, :cond_0
  473. const/4 v0, 0x1
  474. goto :goto_0
  475. :cond_0
  476. const/4 v0, 0x0
  477. :goto_0
  478. return v0
  479. .end method
  480. .method public onComplete()V
  481. .locals 1
  482. .line 311
  483. iget-boolean v0, p0, Lio/reactivex/processors/UnicastProcessor;->done:Z
  484. if-nez v0, :cond_1
  485. iget-boolean v0, p0, Lio/reactivex/processors/UnicastProcessor;->cancelled:Z
  486. if-eqz v0, :cond_0
  487. goto :goto_0
  488. :cond_0
  489. const/4 v0, 0x1
  490. .line 315
  491. iput-boolean v0, p0, Lio/reactivex/processors/UnicastProcessor;->done:Z
  492. .line 317
  493. invoke-virtual {p0}, Lio/reactivex/processors/UnicastProcessor;->doTerminate()V
  494. .line 319
  495. invoke-virtual {p0}, Lio/reactivex/processors/UnicastProcessor;->drain()V
  496. :cond_1
  497. :goto_0
  498. return-void
  499. .end method
  500. .method public onError(Ljava/lang/Throwable;)V
  501. .locals 1
  502. .line 292
  503. iget-boolean v0, p0, Lio/reactivex/processors/UnicastProcessor;->done:Z
  504. if-nez v0, :cond_2
  505. iget-boolean v0, p0, Lio/reactivex/processors/UnicastProcessor;->cancelled:Z
  506. if-eqz v0, :cond_0
  507. goto :goto_0
  508. :cond_0
  509. if-nez p1, :cond_1
  510. .line 298
  511. new-instance p1, Ljava/lang/NullPointerException;
  512. const-string v0, "onError called with null. Null values are generally not allowed in 2.x operators and sources."
  513. invoke-direct {p1, v0}, Ljava/lang/NullPointerException;-><init>(Ljava/lang/String;)V
  514. .line 301
  515. :cond_1
  516. iput-object p1, p0, Lio/reactivex/processors/UnicastProcessor;->error:Ljava/lang/Throwable;
  517. const/4 p1, 0x1
  518. .line 302
  519. iput-boolean p1, p0, Lio/reactivex/processors/UnicastProcessor;->done:Z
  520. .line 304
  521. invoke-virtual {p0}, Lio/reactivex/processors/UnicastProcessor;->doTerminate()V
  522. .line 306
  523. invoke-virtual {p0}, Lio/reactivex/processors/UnicastProcessor;->drain()V
  524. return-void
  525. .line 293
  526. :cond_2
  527. :goto_0
  528. invoke-static {p1}, Lio/reactivex/plugins/RxJavaPlugins;->onError(Ljava/lang/Throwable;)V
  529. return-void
  530. .end method
  531. .method public onNext(Ljava/lang/Object;)V
  532. .locals 1
  533. .annotation system Ldalvik/annotation/Signature;
  534. value = {
  535. "(TT;)V"
  536. }
  537. .end annotation
  538. .line 277
  539. iget-boolean v0, p0, Lio/reactivex/processors/UnicastProcessor;->done:Z
  540. if-nez v0, :cond_2
  541. iget-boolean v0, p0, Lio/reactivex/processors/UnicastProcessor;->cancelled:Z
  542. if-eqz v0, :cond_0
  543. goto :goto_0
  544. :cond_0
  545. if-nez p1, :cond_1
  546. .line 282
  547. new-instance p1, Ljava/lang/NullPointerException;
  548. const-string v0, "onNext called with null. Null values are generally not allowed in 2.x operators and sources."
  549. invoke-direct {p1, v0}, Ljava/lang/NullPointerException;-><init>(Ljava/lang/String;)V
  550. invoke-virtual {p0, p1}, Lio/reactivex/processors/UnicastProcessor;->onError(Ljava/lang/Throwable;)V
  551. return-void
  552. .line 286
  553. :cond_1
  554. iget-object v0, p0, Lio/reactivex/processors/UnicastProcessor;->queue:Lio/reactivex/internal/queue/SpscLinkedArrayQueue;
  555. invoke-virtual {v0, p1}, Lio/reactivex/internal/queue/SpscLinkedArrayQueue;->offer(Ljava/lang/Object;)Z
  556. .line 287
  557. invoke-virtual {p0}, Lio/reactivex/processors/UnicastProcessor;->drain()V
  558. :cond_2
  559. :goto_0
  560. return-void
  561. .end method
  562. .method public onSubscribe(Lorg/reactivestreams/Subscription;)V
  563. .locals 2
  564. .line 268
  565. iget-boolean v0, p0, Lio/reactivex/processors/UnicastProcessor;->done:Z
  566. if-nez v0, :cond_1
  567. iget-boolean v0, p0, Lio/reactivex/processors/UnicastProcessor;->cancelled:Z
  568. if-eqz v0, :cond_0
  569. goto :goto_0
  570. :cond_0
  571. const-wide v0, 0x7fffffffffffffffL
  572. .line 271
  573. invoke-interface {p1, v0, v1}, Lorg/reactivestreams/Subscription;->request(J)V
  574. goto :goto_1
  575. .line 269
  576. :cond_1
  577. :goto_0
  578. invoke-interface {p1}, Lorg/reactivestreams/Subscription;->cancel()V
  579. :goto_1
  580. return-void
  581. .end method
  582. .method protected subscribeActual(Lorg/reactivestreams/Subscriber;)V
  583. .locals 3
  584. .annotation system Ldalvik/annotation/Signature;
  585. value = {
  586. "(",
  587. "Lorg/reactivestreams/Subscriber<",
  588. "-TT;>;)V"
  589. }
  590. .end annotation
  591. .line 324
  592. iget-object v0, p0, Lio/reactivex/processors/UnicastProcessor;->once:Ljava/util/concurrent/atomic/AtomicBoolean;
  593. invoke-virtual {v0}, Ljava/util/concurrent/atomic/AtomicBoolean;->get()Z
  594. move-result v0
  595. if-nez v0, :cond_1
  596. iget-object v0, p0, Lio/reactivex/processors/UnicastProcessor;->once:Ljava/util/concurrent/atomic/AtomicBoolean;
  597. const/4 v1, 0x0
  598. const/4 v2, 0x1
  599. invoke-virtual {v0, v1, v2}, Ljava/util/concurrent/atomic/AtomicBoolean;->compareAndSet(ZZ)Z
  600. move-result v0
  601. if-eqz v0, :cond_1
  602. .line 326
  603. iget-object v0, p0, Lio/reactivex/processors/UnicastProcessor;->wip:Lio/reactivex/internal/subscriptions/BasicIntQueueSubscription;
  604. invoke-interface {p1, v0}, Lorg/reactivestreams/Subscriber;->onSubscribe(Lorg/reactivestreams/Subscription;)V
  605. .line 327
  606. iget-object v0, p0, Lio/reactivex/processors/UnicastProcessor;->actual:Ljava/util/concurrent/atomic/AtomicReference;
  607. invoke-virtual {v0, p1}, Ljava/util/concurrent/atomic/AtomicReference;->set(Ljava/lang/Object;)V
  608. .line 328
  609. iget-boolean p1, p0, Lio/reactivex/processors/UnicastProcessor;->cancelled:Z
  610. if-eqz p1, :cond_0
  611. .line 329
  612. iget-object p1, p0, Lio/reactivex/processors/UnicastProcessor;->actual:Ljava/util/concurrent/atomic/AtomicReference;
  613. const/4 v0, 0x0
  614. invoke-virtual {p1, v0}, Ljava/util/concurrent/atomic/AtomicReference;->lazySet(Ljava/lang/Object;)V
  615. goto :goto_0
  616. .line 331
  617. :cond_0
  618. invoke-virtual {p0}, Lio/reactivex/processors/UnicastProcessor;->drain()V
  619. goto :goto_0
  620. .line 334
  621. :cond_1
  622. new-instance v0, Ljava/lang/IllegalStateException;
  623. const-string v1, "This processor allows only a single Subscriber"
  624. invoke-direct {v0, v1}, Ljava/lang/IllegalStateException;-><init>(Ljava/lang/String;)V
  625. invoke-static {v0, p1}, Lio/reactivex/internal/subscriptions/EmptySubscription;->error(Ljava/lang/Throwable;Lorg/reactivestreams/Subscriber;)V
  626. :goto_0
  627. return-void
  628. .end method